Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions sdks/python/apache_beam/yaml/tests/explode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipelines:
# Simple Explode with single field
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- produce: ['Strawberry', 'Corn']
season: ['spring', 'summer']
- type: Explode
name: Flatten lists
config:
fields: [produce]
- type: AssertEqual
config:
elements:
- {produce: 'Strawberry', season: ['spring','summer']}
- {produce: 'Corn', season: ['spring', 'summer']}

# Simple Explode with multiple fields and cross_product=False
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- produce: ['Strawberry', 'Corn']
season: ['spring', 'summer']
- type: Explode
name: Flatten lists
config:
fields: [produce, season]
cross_product: false
- type: AssertEqual
config:
elements:
- {produce: 'Strawberry', season: 'spring'}
- {produce: 'Corn', season: 'summer'}

# Simple Explode with multiple fields and cross_product=True
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- produce: ['Strawberry', 'Corn']
season: ['spring', 'summer']
- type: Explode
name: Flatten lists
config:
fields: [produce, season]
cross_product: true
- type: AssertEqual
config:
elements:
- {produce: 'Strawberry', season: 'spring'}
- {produce: 'Strawberry', season: 'summer'}
- {produce: 'Corn', season: 'spring'}
- {produce: 'Corn', season: 'summer'}

147 changes: 147 additions & 0 deletions sdks/python/apache_beam/yaml/tests/filter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: TEMP_DIR
type: "tempfile.TemporaryDirectory"

pipelines:
# Simple Filter using Python
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}
- type: Filter
config:
language: python
keep: category == "Electronics" and price > 100
- type: AssertEqual
config:
elements:
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}

# Simple Filter using Python callable
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}
- type: Filter
config:
language: python
keep:
callable: |
def my_filter(row):
return row.category == "Electronics" and row.price > 100
- type: AssertEqual
config:
elements:
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}

# Simple Filter using Java
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}
- type: Filter
config:
language: java
keep: category.equals("Electronics") && price > 100
- type: AssertEqual
config:
elements:
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}

# Simple Filter using Java callable
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}
- type: Filter
config:
language: java
keep:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
public class MyFunction implements Function<Row, Boolean> {
public Boolean apply(Row row) {
return row.getString("category").equals("Electronics") && row.getDouble("price") > 100;
}
}
- type: AssertEqual
config:
elements:
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}


# Simple Filter with error handling
- pipeline:
type: composite
transforms:
- type: Create
config:
elements:
- {category: "Electronics", price: 59.99, product_name: "Headphones", transaction_id: "T0012"}
- type: Filter
name: FilterWithCategory
input: Create
config:
language: python
keep: category == Electronics
error_handling:
output: error_output
- type: WriteToJson
name: WriteErrorsToJson
input: FilterWithCategory.error_output
config:
path: "{TEMP_DIR}/error.json"

# Read errors from previous pipeline
- pipeline:
type: chain
transforms:
- type: ReadFromJson
config:
path: "{TEMP_DIR}/error.json*"
125 changes: 125 additions & 0 deletions sdks/python/apache_beam/yaml/tests/flatten.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipelines:
# Simple Flatten with three inputs
- pipeline:
type: composite
transforms:
- type: Create
name: CreateA
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- type: Create
name: CreateB
config:
elements:
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- type: Create
name: CreateC
config:
elements:
- {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}
- type: Flatten
input: [CreateA, CreateB, CreateC]
- type: AssertEqual
input: Flatten
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99}

# Simple Flatten with one input
- pipeline:
type: composite
transforms:
- type: Create
name: CreateA
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}
- type: Flatten
input: CreateA
- type: AssertEqual
input: Flatten
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9}

# Simple Flatten with duplicates in each input
- pipeline:
type: composite
transforms:
- type: Create
name: CreateA
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- type: Create
name: CreateB
config:
elements:
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- type: Flatten
input: [CreateA, CreateB]
- type: AssertEqual
input: Flatten
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}

# Simple Flatten with duplicates across inputs
- pipeline:
type: composite
transforms:
- type: Create
name: CreateA
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- type: Create
name: CreateB
config:
elements:
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- type: Flatten
input: [CreateA, CreateB]
- type: AssertEqual
input: Flatten
config:
elements:
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
- {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99}
Loading
Loading