Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
- name: run PreCommit Yaml Xlang Direct script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:yamlIntegrationTests
gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we add more tests, it may worth split out the workflow and create a "PreCommit YAML Xlang" exercises more tests on postsubmit schedule, to keep presubmit run on reasonable time. Could be a follow up

- name: Archive Python Test Results
uses: actions/upload-artifact@v4
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3021,9 +3021,15 @@ class BeamModulePlugin implements Plugin<Project> {
dependsOn ':sdks:python:sdist'
doLast {
def distTarBall = "${pythonRootDir}/build/apache-beam.tar.gz"
def packages = "gcp,test,aws,azure,dataframe"
def extra = project.findProperty('beamPythonExtra')
if (extra) {
packages += ",${extra}"
}

project.exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install --pre --retries 10 ${distTarBall}[gcp,test,aws,azure,dataframe]"
args '-c', ". ${project.ext.envdir}/bin/activate && pip install --pre --retries 10 ${distTarBall}[${packages}]"
}
}
}
Expand Down
84 changes: 84 additions & 0 deletions sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipelines:
# Assign timestamp to beam row element
- pipeline:
type: chain
transforms:
- type: Create
name: CreateVisits
config:
elements:
- user: alice
new_time_stamp: 1
- user: alice
new_time_stamp: 3
- user: bob
new_time_stamp: 7
- type: AssignTimestamps
config:
timestamp: new_time_stamp
- type: ExtractWindowingInfo
config:
fields: [timestamp]
- type: MapToFields
config:
language: python
fields:
user: user
timestamp: timestamp
- type: AssertEqual
config:
elements:
- {user: "alice", timestamp: 1}
- {user: "alice", timestamp: 3}
- {user: "bob", timestamp: 7}

# Assign timestamp to beam row element with error_handling
- pipeline:
type: composite
transforms:
- type: Create
name: CreateVisits
config:
elements:
- {user: alice, timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssignTimestamps
input: CreateVisits
config:
timestamp: timestamp
error_handling:
output: invalid_rows
- type: MapToFields
input: AssignTimestamps.invalid_rows
config:
language: python
fields:
user: "element.user"
timestamp: "element.timestamp"
- type: AssertEqual
input: MapToFields
config:
elements:
- {user: "alice", timestamp: "not-valid"}
- type: AssertEqual
input: AssignTimestamps
config:
elements:
- {user: bob, timestamp: 3}
66 changes: 66 additions & 0 deletions sdks/python/apache_beam/yaml/tests/create.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipelines:
# Simple Create with element list
- pipeline:
type: chain
transforms:
- type: Create
config:
elements: [1,2,3,4,5]
- type: AssertEqual
config:
elements:
- {element: 1}
- {element: 2}
- {element: 3}
- {element: 4}
- {element: 5}

# Simple Create with more complex beam row
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {first: 0, second: [1,2,3]}
- {first: 1, second: [4,5,6]}
- type: AssertEqual
config:
elements:
- {first: 0, second: [1,2,3]}
- {first: 1, second: [4,5,6]}

# Simple Create with reshuffle
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {first: 0, second: [1,2,3]}
- {first: 1, second: [4,5,6]}
- {first: 2, second: [7,8,9]}
reshuffle: false
- type: AssertEqual
config:
elements:
- {first: 0, second: [1,2,3]}
- {first: 1, second: [4,5,6]}
- {first: 2, second: [7,8,9]}
131 changes: 131 additions & 0 deletions sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipelines:
# Simply extract windowing info with no fields resulting in default fields
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: ExtractWindowingInfo
- type: MapToFields
config:
language: python
fields:
label:
callable: "lambda x: x.label"
rank:
callable: "lambda x: x.rank"
timestamp:
callable: "lambda x: str(x.timestamp)"
window_start:
callable: "lambda x: str(x.window_start)"
window_end:
callable: "lambda x: str(x.window_end)"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global window is default. The min / max timestamp of global window is also internal implementation detail (?). Consider assign a fixed window then assert (could be follow up)

- {label: "37a", rank: 1, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
- {label: "389a", rank: 2, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}


# Simply extract windowing info with all available fields
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: ExtractWindowingInfo
config:
fields: [timestamp, window_start, window_end, window_string, window_type, window_object, pane_info]
- type: MapToFields
config:
language: python
fields:
label:
callable: "lambda x: x.label"
rank:
callable: "lambda x: x.rank"
timestamp:
callable: "lambda x: str(x.timestamp)"
window_start:
callable: "lambda x: str(x.window_start)"
window_end:
callable: "lambda x: str(x.window_end)"
window_string:
callable: "lambda x: str(x.window_string)"
window_type:
callable: "lambda x: str(x.window_type)"
window_object:
callable: "lambda x: str(x.window_object)"
pane_info:
callable: "lambda x: str(x.pane_info)"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "37a", rank: 1, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "389a", rank: 2, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}


# Simply extract windowing info with a few fields renamed
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: ExtractWindowingInfo
config:
fields:
ts: timestamp
ws: window_start
pane: pane_info
- type: MapToFields
config:
language: python
fields:
label:
callable: "lambda x: x.label"
rank:
callable: "lambda x: x.rank"
ts:
callable: "lambda x: str(x.ts)"
ws:
callable: "lambda x: str(x.ws)"
pane:
callable: "lambda x: str(x.pane)"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "37a", rank: 1, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "389a", rank: 2, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}

Loading
Loading