From 87698d868ae9b6fbc53b00c95c9aa65f840cf345 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 10:08:20 +0000 Subject: [PATCH 01/13] Add 3 examples in playground SQL transform, schema transform and Composed Combine. All applying the same combiners (Min,Max,Sum) on the input. --- .../examples/CoCombineTransformExample.java | 141 ++++++++++++++++++ .../beam/examples/SchemaTransformExample.java | 110 ++++++++++++++ .../beam/examples/SqlTransformExample.java | 104 +++++++++++++ 3 files changed, 355 insertions(+) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java new file mode 100644 index 000000000000..a955a218f589 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +// beam-playground: +// name: CombineFns.ComposedCombineFn +// description: Demonstration of Composed Combine transform usage. +// multifile: false +// default_example: false +// context_line: 64 +// categories: +// - Schemas +// - Combiners +// complexity: MEDIUM +// tags: +// - transforms +// - numbers + +// gradle clean execute -DmainClass=org.apache.beam.examples.CoCombineTransformExample --args="--runner=DirectRunner" -Pdirect-runner + +import java.util.ArrayList; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineFns; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An example that uses Composed combiners to apply multiple combiners (Sum, Min, Max) on the + * input PCollection. + * + *

For a detailed documentation of Composed Combines, see + * https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/transforms/CombineFns.html + */ +public class CoCombineTransformExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline pipeline = Pipeline.create(options); + // [START main_section] + // Create input + PCollection> inputKV = + pipeline.apply(Create.of( + KV.of(1L, 1L), + KV.of(1L, 5L), + KV.of(2L, 10L), + KV.of(2L, 20L), + KV.of(3L, 1L) + )); + /** + * Define the function used to filter elements before sending them to the Combiner. + * With identityFn all elements (here perKey) will be combined. + */ + SimpleFunction identityFn = + new SimpleFunction() { + @Override + public Long apply(Long input) { + return input; + }}; + + // tuple tags to identify the outputs of the Composed Combine + TupleTag sumTag = new TupleTag("sum_n"); + TupleTag minTag = new TupleTag("min_n"); + TupleTag maxTag = new TupleTag("max_n"); + + CombineFns.ComposedCombineFn composedCombine = + CombineFns.compose() + .with(identityFn, Sum.ofLongs(), sumTag) //elements filtered by the identityFn, will be combined in a Sum and the output will be tagged + .with(identityFn, Min.ofLongs(), minTag) + .with(identityFn, Max.ofLongs(), maxTag) + ; + + PCollection> combinedData = + inputKV + .apply("Combine all", Combine.perKey(composedCombine)); + + // transform the CoCombineResult output into a KV format, simpler to use for printing + PCollection>>> result = combinedData + .apply(ParDo.of( + new DoFn, KV>>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + CombineFns.CoCombineResult e = c.element().getValue(); + c.output(KV.of(c.element().getKey(), + new ArrayList>() {{ + add(KV.of(minTag.getId(), e.get(minTag))); + add(KV.of(maxTag.getId(), e.get(maxTag))); + add(KV.of(sumTag.getId(), e.get(sumTag))); + }} + )); + } + })); + + // [END main_section] + // Log values + result.apply(ParDo.of(new LogOutput<>("PCollection values after CoCombine transform: "))); + pipeline.run(); + } + + static class LogOutput extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + private final String prefix; + + public LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java new file mode 100644 index 000000000000..4fa63ddd4541 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +// beam-playground: +// name: Group.ByFields +// description: Demonstration of Schema transform usage. +// multifile: false +// default_example: false +// context_line: 60 +// categories: +// - Schemas +// - Combiners +// complexity: BASIC +// tags: +// - transforms +// - numbers + +//gradle clean execute -DmainClass=org.apache.beam.examples.SchemaTransformExample --args="--runner=DirectRunner" -Pdirect-runner + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.Group; +import org.apache.beam.sdk.schemas.transforms.Select; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.values.Row; + +/** + * An example that uses Schema transforms to apply multiple combiners (Sum, Min, Max) on the + * input PCollection. + * + *

For a detailed documentation of Schemas, see + * https://beam.apache.org/documentation/programming-guide/#schemas + */ +public class SchemaTransformExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline pipeline = Pipeline.create(options); + // [START main_section] + // define the input row schema + Schema inputSchema = + Schema.builder() + .addInt32Field("k") + .addInt32Field("n") + .build(); + // Create input + PCollection input = pipeline.apply( + Create.of( + Row.withSchema(inputSchema).addValues(1,1).build(), + Row.withSchema(inputSchema).addValues(1,5).build(), + Row.withSchema(inputSchema).addValues(2,10).build(), + Row.withSchema(inputSchema).addValues(2,20).build(), + Row.withSchema(inputSchema).addValues(3,1).build() + )) + .setRowSchema(inputSchema); + + PCollection result = input.apply(Select.fieldNames("n",",k")) + .apply(Group.byFieldNames("k") + .aggregateField("n", Min.ofIntegers(), "min_n") + .aggregateField("n", Max.ofIntegers(), "max_n") + .aggregateField("n", Sum.ofIntegers(), "sum_n") + ) + ; + // [END main_section] + // Log values + result.apply(ParDo.of(new LogOutput<>("PCollection values after Schema transform: "))); + pipeline.run(); + } + + static class LogOutput extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + private final String prefix; + + public LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java new file mode 100644 index 000000000000..00b56365f3b7 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +// beam-playground: +// name: SqlTransform +// description: Demonstration of SQL transform usage. +// multifile: false +// default_example: false +// context_line: 60 +// categories: +// - SQL +// - Combiners +// complexity: BASIC +// tags: +// - transforms +// - numbers + +//gradle clean execute -DmainClass=org.apache.beam.examples.SqlTransformExample --args="--runner=DirectRunner" -Pdirect-runner + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; + +/** + * An example that uses Beam SQL transformation to apply multiple combiners (Min, Max, Sum) on the + * input PCollection. + * + *

Using SQL syntax to define a transform than can be integrated in a Java pipeline. + * + *

For a detailed documentation of Beam SQL, see + * https://beam.apache.org/documentation/dsls/sql/overview/ + */ +public class SqlTransformExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline pipeline = Pipeline.create(options); + // [START main_section] + // define the input row format + Schema inputSchema = + Schema.builder() + .addInt32Field("k") + .addInt32Field("n") + .build(); + // Create input + PCollection input = pipeline.apply( + Create.of( + Row.withSchema(inputSchema).addValues(1,1).build(), + Row.withSchema(inputSchema).addValues(1,5).build(), + Row.withSchema(inputSchema).addValues(2,10).build(), + Row.withSchema(inputSchema).addValues(2,20).build(), + Row.withSchema(inputSchema).addValues(3,1).build() + )) + .setRowSchema(inputSchema); + + + PCollection result = input.apply( + SqlTransform.query("select k, min(n) as min_n, max(n) as max_n, sum(n) as sum_n from PCOLLECTION group by k")); + // [END main_section] + // Log values + result.apply(ParDo.of(new LogOutput<>("PCollection values after SQL transform: "))); + pipeline.run(); + } + + static class LogOutput extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + private final String prefix; + + public LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} \ No newline at end of file From 09c206b6f91c93b80c3e891bda378a1c1ddb0454 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 11:18:25 +0000 Subject: [PATCH 02/13] update build.gradle with dep : beam-sdks-java-extensions-sql --- examples/java/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/java/build.gradle b/examples/java/build.gradle index e10d4fd6b1d6..9546e9270a35 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -64,6 +64,7 @@ dependencies { implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:google-cloud-platform-core") implementation project(":sdks:java:extensions:python") + implementation project(":sdks:java:extensions:sql") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") runtimeOnly project(":sdks:java:io:iceberg") From 27f4be5315b1c6cb340134c978873571a902e8ba Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 11:32:31 +0000 Subject: [PATCH 03/13] avoid DoubleBrace --- .../beam/examples/CoCombineTransformExample.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index a955a218f589..d57ab877ba76 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -108,13 +108,11 @@ public Long apply(Long input) { @ProcessElement public void processElement(ProcessContext c) throws Exception { CombineFns.CoCombineResult e = c.element().getValue(); - c.output(KV.of(c.element().getKey(), - new ArrayList>() {{ - add(KV.of(minTag.getId(), e.get(minTag))); - add(KV.of(maxTag.getId(), e.get(maxTag))); - add(KV.of(sumTag.getId(), e.get(sumTag))); - }} - )); + ArrayList> o = new ArrayList>(); + o.add(KV.of(minTag.getId(), e.get(minTag))); + o.add(KV.of(maxTag.getId(), e.get(maxTag))); + o.add(KV.of(sumTag.getId(), e.get(sumTag))); + c.output(KV.of(c.element().getKey(),o)); } })); From b5966da8f5e5fd8d6a36986bc8ed4fd92802b044 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 11:54:08 +0000 Subject: [PATCH 04/13] spotlessApply and wrapping the combiners in CoCombineTransform example because in real life examples this is almost always needed. --- .../examples/CoCombineTransformExample.java | 160 +++++++++++++----- .../beam/examples/SchemaTransformExample.java | 63 ++++--- .../beam/examples/SqlTransformExample.java | 49 +++--- 3 files changed, 170 insertions(+), 102 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index d57ab877ba76..32bba9bf6069 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -31,7 +31,8 @@ // - transforms // - numbers -// gradle clean execute -DmainClass=org.apache.beam.examples.CoCombineTransformExample --args="--runner=DirectRunner" -Pdirect-runner +// gradle clean execute -DmainClass=org.apache.beam.examples.CoCombineTransformExample +// --args="--runner=DirectRunner" -Pdirect-runner import java.util.ArrayList; import org.apache.beam.sdk.Pipeline; @@ -53,69 +54,138 @@ import org.slf4j.LoggerFactory; /** - * An example that uses Composed combiners to apply multiple combiners (Sum, Min, Max) on the - * input PCollection. - * + * An example that uses Composed combiners to apply multiple combiners (Sum, Min, Max) on the input + * PCollection. + * *

For a detailed documentation of Composed Combines, see - * https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/transforms/CombineFns.html + * https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/transforms/CombineFns.html + * + * + *

Remark, the combiners are wrapped in a DropNullFn, because when cobining the input usually has + * many null values that need to be handled by the combiner. */ public class CoCombineTransformExample { + + /** + * A wrapper for combiners, that will drop the null elements before applying the combiner. Similar + * to org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations + * private DropNullFn() + */ + public static class DropNullFn + extends Combine.CombineFn { + + protected final Combine.CombineFn combineFn; + + public DropNullFn(Combine.CombineFn combineFn) { + this.combineFn = combineFn; + } + + @Override + public AccumT createAccumulator() { + return null; + } + + @Override + public AccumT addInput(AccumT accumulator, InputT input) { + if (input == null) { + return accumulator; + } + + if (accumulator == null) { + accumulator = combineFn.createAccumulator(); + } + return combineFn.addInput(accumulator, input); + } + + @Override + public AccumT mergeAccumulators(Iterable accumulators) { + // filter out nulls + accumulators = Iterables.filter(accumulators, Predicates.notNull()); + + // handle only nulls + if (!accumulators.iterator().hasNext()) { + return null; + } + + return combineFn.mergeAccumulators(accumulators); + } + + @Override + public OutputT extractOutput(AccumT accumulator) { + if (accumulator == null) { + return null; + } + return combineFn.extractOutput(accumulator); + } + + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) + throws CannotProvideCoderException { + Coder coder = combineFn.getAccumulatorCoder(registry, inputCoder); + if (coder instanceof NullableCoder) { + return coder; + } + return NullableCoder.of(coder); + } + } + public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); // [START main_section] // Create input PCollection> inputKV = - pipeline.apply(Create.of( - KV.of(1L, 1L), - KV.of(1L, 5L), - KV.of(2L, 10L), - KV.of(2L, 20L), - KV.of(3L, 1L) - )); - /** - * Define the function used to filter elements before sending them to the Combiner. - * With identityFn all elements (here perKey) will be combined. + pipeline.apply( + Create.of(KV.of(1L, 1L), KV.of(1L, 5L), KV.of(2L, 10L), KV.of(2L, 20L), KV.of(3L, 1L))); + /** + * Define the function used to filter elements before sending them to the Combiner. With + * identityFn all elements (here perKey) will be combined. */ SimpleFunction identityFn = - new SimpleFunction() { - @Override - public Long apply(Long input) { + new SimpleFunction() { + @Override + public Long apply(Long input) { return input; - }}; + } + }; // tuple tags to identify the outputs of the Composed Combine TupleTag sumTag = new TupleTag("sum_n"); TupleTag minTag = new TupleTag("min_n"); TupleTag maxTag = new TupleTag("max_n"); - CombineFns.ComposedCombineFn composedCombine = - CombineFns.compose() - .with(identityFn, Sum.ofLongs(), sumTag) //elements filtered by the identityFn, will be combined in a Sum and the output will be tagged - .with(identityFn, Min.ofLongs(), minTag) - .with(identityFn, Max.ofLongs(), maxTag) - ; - - PCollection> combinedData = - inputKV - .apply("Combine all", Combine.perKey(composedCombine)); - + CombineFns.ComposedCombineFn composedCombine = + CombineFns.compose() + .with( + identityFn, + new DropNullFn(Sum.ofLongs()), + sumTag) // elements filtered by the identityFn, will be combined in a Sum and the + // output will be tagged + .with(identityFn, new DropNullFn(Min.ofLongs()), minTag) + .with(identityFn, new DropNullFn(Max.ofLongs()), maxTag); + + PCollection> combinedData = + inputKV.apply("Combine all", Combine.perKey(composedCombine)); + // transform the CoCombineResult output into a KV format, simpler to use for printing - PCollection>>> result = combinedData - .apply(ParDo.of( - new DoFn, KV>>>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - CombineFns.CoCombineResult e = c.element().getValue(); - ArrayList> o = new ArrayList>(); - o.add(KV.of(minTag.getId(), e.get(minTag))); - o.add(KV.of(maxTag.getId(), e.get(maxTag))); - o.add(KV.of(sumTag.getId(), e.get(sumTag))); - c.output(KV.of(c.element().getKey(),o)); - } - })); - + PCollection>>> result = + combinedData.apply( + ParDo.of( + new DoFn< + KV, KV>>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + CombineFns.CoCombineResult e = c.element().getValue(); + ArrayList> o = new ArrayList>(); + o.add(KV.of(minTag.getId(), e.get(minTag))); + o.add(KV.of(maxTag.getId(), e.get(maxTag))); + o.add(KV.of(sumTag.getId(), e.get(sumTag))); + c.output(KV.of(c.element().getKey(), o)); + } + })); + // [END main_section] // Log values result.apply(ParDo.of(new LogOutput<>("PCollection values after CoCombine transform: "))); @@ -136,4 +206,4 @@ public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } -} \ No newline at end of file +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java index 4fa63ddd4541..d708fa558f1d 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java @@ -31,29 +31,30 @@ // - transforms // - numbers -//gradle clean execute -DmainClass=org.apache.beam.examples.SchemaTransformExample --args="--runner=DirectRunner" -Pdirect-runner +// gradle clean execute -DmainClass=org.apache.beam.examples.SchemaTransformExample +// --args="--runner=DirectRunner" -Pdirect-runner import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.Group; +import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.transforms.Group; -import org.apache.beam.sdk.schemas.transforms.Select; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.values.Row; /** - * An example that uses Schema transforms to apply multiple combiners (Sum, Min, Max) on the - * input PCollection. - * + * An example that uses Schema transforms to apply multiple combiners (Sum, Min, Max) on the input + * PCollection. + * *

For a detailed documentation of Schemas, see * https://beam.apache.org/documentation/programming-guide/#schemas @@ -64,29 +65,27 @@ public static void main(String[] args) { Pipeline pipeline = Pipeline.create(options); // [START main_section] // define the input row schema - Schema inputSchema = - Schema.builder() - .addInt32Field("k") - .addInt32Field("n") - .build(); + Schema inputSchema = Schema.builder().addInt32Field("k").addInt32Field("n").build(); // Create input - PCollection input = pipeline.apply( - Create.of( - Row.withSchema(inputSchema).addValues(1,1).build(), - Row.withSchema(inputSchema).addValues(1,5).build(), - Row.withSchema(inputSchema).addValues(2,10).build(), - Row.withSchema(inputSchema).addValues(2,20).build(), - Row.withSchema(inputSchema).addValues(3,1).build() - )) - .setRowSchema(inputSchema); - - PCollection result = input.apply(Select.fieldNames("n",",k")) - .apply(Group.byFieldNames("k") - .aggregateField("n", Min.ofIntegers(), "min_n") - .aggregateField("n", Max.ofIntegers(), "max_n") - .aggregateField("n", Sum.ofIntegers(), "sum_n") - ) - ; + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues(1, 1).build(), + Row.withSchema(inputSchema).addValues(1, 5).build(), + Row.withSchema(inputSchema).addValues(2, 10).build(), + Row.withSchema(inputSchema).addValues(2, 20).build(), + Row.withSchema(inputSchema).addValues(3, 1).build())) + .setRowSchema(inputSchema); + + PCollection result = + input + .apply(Select.fieldNames("n", ",k")) + .apply( + Group.byFieldNames("k") + .aggregateField("n", Min.ofIntegers(), "min_n") + .aggregateField("n", Max.ofIntegers(), "max_n") + .aggregateField("n", Sum.ofIntegers(), "sum_n")); // [END main_section] // Log values result.apply(ParDo.of(new LogOutput<>("PCollection values after Schema transform: "))); diff --git a/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java index 00b56365f3b7..ec563783258f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java @@ -31,26 +31,27 @@ // - transforms // - numbers -//gradle clean execute -DmainClass=org.apache.beam.examples.SqlTransformExample --args="--runner=DirectRunner" -Pdirect-runner +// gradle clean execute -DmainClass=org.apache.beam.examples.SqlTransformExample +// --args="--runner=DirectRunner" -Pdirect-runner import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.sdk.extensions.sql.SqlTransform; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.values.Row; /** - * An example that uses Beam SQL transformation to apply multiple combiners (Min, Max, Sum) on the + * An example that uses Beam SQL transformation to apply multiple combiners (Min, Max, Sum) on the * input PCollection. - * - *

Using SQL syntax to define a transform than can be integrated in a Java pipeline. + * + *

Using SQL syntax to define a transform than can be integrated in a Java pipeline. * *

For a detailed documentation of Beam SQL, see @@ -62,25 +63,23 @@ public static void main(String[] args) { Pipeline pipeline = Pipeline.create(options); // [START main_section] // define the input row format - Schema inputSchema = - Schema.builder() - .addInt32Field("k") - .addInt32Field("n") - .build(); + Schema inputSchema = Schema.builder().addInt32Field("k").addInt32Field("n").build(); // Create input - PCollection input = pipeline.apply( - Create.of( - Row.withSchema(inputSchema).addValues(1,1).build(), - Row.withSchema(inputSchema).addValues(1,5).build(), - Row.withSchema(inputSchema).addValues(2,10).build(), - Row.withSchema(inputSchema).addValues(2,20).build(), - Row.withSchema(inputSchema).addValues(3,1).build() - )) - .setRowSchema(inputSchema); - + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues(1, 1).build(), + Row.withSchema(inputSchema).addValues(1, 5).build(), + Row.withSchema(inputSchema).addValues(2, 10).build(), + Row.withSchema(inputSchema).addValues(2, 20).build(), + Row.withSchema(inputSchema).addValues(3, 1).build())) + .setRowSchema(inputSchema); - PCollection result = input.apply( - SqlTransform.query("select k, min(n) as min_n, max(n) as max_n, sum(n) as sum_n from PCOLLECTION group by k")); + PCollection result = + input.apply( + SqlTransform.query( + "select k, min(n) as min_n, max(n) as max_n, sum(n) as sum_n from PCOLLECTION group by k")); // [END main_section] // Log values result.apply(ParDo.of(new LogOutput<>("PCollection values after SQL transform: "))); @@ -101,4 +100,4 @@ public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } -} \ No newline at end of file +} From 0176d87ec7759152119d05db4b91258540b83de0 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 13:10:42 +0000 Subject: [PATCH 05/13] add imports for DropNullFn --- .../org/apache/beam/examples/CoCombineTransformExample.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index 32bba9bf6069..d75a8697264e 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -34,8 +34,14 @@ // gradle clean execute -DmainClass=org.apache.beam.examples.CoCombineTransformExample // --args="--runner=DirectRunner" -Pdirect-runner +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import java.util.ArrayList; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; From a639506dc22578c766f0fca9438484465018db45 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 14:26:23 +0000 Subject: [PATCH 06/13] fix nullness warnings --- .../beam/examples/CoCombineTransformExample.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index d75a8697264e..7ec38682a7cc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -22,7 +22,7 @@ // description: Demonstration of Composed Combine transform usage. // multifile: false // default_example: false -// context_line: 64 +// context_line: 143 // categories: // - Schemas // - Combiners @@ -71,6 +71,9 @@ *

Remark, the combiners are wrapped in a DropNullFn, because when cobining the input usually has * many null values that need to be handled by the combiner. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) public class CoCombineTransformExample { /** @@ -166,11 +169,11 @@ public Long apply(Long input) { CombineFns.compose() .with( identityFn, - new DropNullFn(Sum.ofLongs()), + new DropNullFn(Sum.ofLongs()), sumTag) // elements filtered by the identityFn, will be combined in a Sum and the // output will be tagged - .with(identityFn, new DropNullFn(Min.ofLongs()), minTag) - .with(identityFn, new DropNullFn(Max.ofLongs()), maxTag); + .with(identityFn, new DropNullFn(Min.ofLongs()), minTag) + .with(identityFn, new DropNullFn(Max.ofLongs()), maxTag); PCollection> combinedData = inputKV.apply("Combine all", Combine.perKey(composedCombine)); From dad1996f2869d8c1c07575d652c25f5cccdda47a Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 14:37:36 +0000 Subject: [PATCH 07/13] use vendored guava classes --- .../org/apache/beam/examples/CoCombineTransformExample.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index 7ec38682a7cc..91d5fe6ad792 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -34,8 +34,6 @@ // gradle clean execute -DmainClass=org.apache.beam.examples.CoCombineTransformExample // --args="--runner=DirectRunner" -Pdirect-runner -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; import java.util.ArrayList; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -56,6 +54,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 22da2f6ebe8d7b7c8b38b5053f75327a9e2d1ce8 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 20:10:31 +0000 Subject: [PATCH 08/13] correct Composed CombineFn doc link --- .../org/apache/beam/examples/CoCombineTransformExample.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index 91d5fe6ad792..3b91b29e9f28 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -64,8 +64,8 @@ * PCollection. * *

For a detailed documentation of Composed Combines, see - * https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/transforms/CombineFns.html + * href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/CombineFns.html"> + * https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/CombineFns.html * * *

Remark, the combiners are wrapped in a DropNullFn, because when cobining the input usually has From 0bf28888989b7fdca3f501f8e54b0c56b93346bd Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 17 Mar 2025 20:17:47 +0000 Subject: [PATCH 09/13] fix typo --- .../java/org/apache/beam/examples/SchemaTransformExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java index d708fa558f1d..861b35b463e1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java @@ -80,7 +80,7 @@ public static void main(String[] args) { PCollection result = input - .apply(Select.fieldNames("n", ",k")) + .apply(Select.fieldNames("n", "k")) .apply( Group.byFieldNames("k") .aggregateField("n", Min.ofIntegers(), "min_n") From 7e174afd7a3a7cebfd42bbcb9dbb5a4686dcf8a4 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 8 Apr 2025 07:44:39 +0000 Subject: [PATCH 10/13] reorg the beam sql examples in their own project under examples --- examples/java/build.gradle | 1 - examples/java/sql/build.gradle | 162 ++++++++++++++++++ .../examples/sql}/SchemaTransformExample.java | 2 +- .../examples/sql}/SqlTransformExample.java | 4 +- settings.gradle.kts | 1 + 5 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 examples/java/sql/build.gradle rename examples/java/{src/main/java/org/apache/beam/examples => sql/src/main/java/org/apache/beam/examples/sql}/SchemaTransformExample.java (99%) rename examples/java/{src/main/java/org/apache/beam/examples => sql/src/main/java/org/apache/beam/examples/sql}/SqlTransformExample.java (98%) diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 9546e9270a35..e10d4fd6b1d6 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -64,7 +64,6 @@ dependencies { implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:google-cloud-platform-core") implementation project(":sdks:java:extensions:python") - implementation project(":sdks:java:extensions:sql") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") runtimeOnly project(":sdks:java:io:iceberg") diff --git a/examples/java/sql/build.gradle b/examples/java/sql/build.gradle new file mode 100644 index 000000000000..3209277dd25f --- /dev/null +++ b/examples/java/sql/build.gradle @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonOutput + +plugins { + id 'java' + id 'org.apache.beam.module' + id 'com.github.johnrengelman.shadow' +} + +applyJavaNature( + exportJavadoc: false, + automaticModuleName: 'org.apache.beam.examples.sql', +) +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() + +description = "Apache Beam :: Examples :: Java" +ext.summary = """Apache Beam SDK provides a simple, Java-based +interface for processing virtually any size data. This +artifact includes all Apache Beam Java SDK examples.""" + +/** Define the list of runners which execute a precommit test. + * Some runners are run from separate projects, see the preCommit task below + * for details. + */ +def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"] +for (String runner : preCommitRunners) { + configurations.create(runner + "PreCommit") +} +configurations.sparkRunnerPreCommit { + // Ban certain dependencies to prevent a StackOverflow within Spark + // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14 + exclude group: "org.slf4j", module: "jul-to-slf4j" + exclude group: "org.slf4j", module: "slf4j-jdk14" +} + +dependencies { + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation library.java.vendored_guava_32_1_2_jre + if (project.findProperty('testJavaVersion') == '21' || JavaVersion.current().compareTo(JavaVersion.VERSION_21) >= 0) { + // this dependency is a provided dependency for kafka-avro-serializer. It is not needed to compile with Java<=17 + // but needed for compile only under Java21, specifically, required for extending from AbstractKafkaAvroDeserializer + compileOnly library.java.kafka + } + implementation library.java.kafka_clients + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:extensions:google-cloud-platform-core") + implementation project(":sdks:java:extensions:python") + implementation project(":sdks:java:extensions:sql") + implementation project(":sdks:java:io:google-cloud-platform") + implementation project(":sdks:java:managed") + implementation project(":sdks:java:extensions:ml") + implementation library.java.bigdataoss_util + implementation library.java.google_api_client + implementation library.java.google_api_services_bigquery + implementation library.java.google_api_services_pubsub + implementation library.java.google_auth_library_credentials + implementation library.java.google_auth_library_oauth2_http + implementation library.java.google_cloud_datastore_v1_proto_client + implementation library.java.google_code_gson + implementation library.java.google_http_client + implementation library.java.google_oauth_client + implementation library.java.jackson_databind + implementation library.java.joda_time + implementation library.java.protobuf_java + implementation library.java.proto_google_cloud_bigtable_v2 + implementation library.java.proto_google_cloud_datastore_v1 + implementation library.java.slf4j_api + implementation library.java.commons_io + implementation library.java.commons_csv + runtimeOnly project(path: ":runners:direct-java", configuration: "shadow") + implementation library.java.vendored_grpc_1_69_0 + implementation library.java.vendored_guava_32_1_2_jre + implementation "com.google.api.grpc:proto-google-cloud-language-v1:1.81.4" + implementation "org.apache.commons:commons-lang3:3.9" + implementation "org.apache.httpcomponents:httpclient:4.5.13" + implementation "org.apache.httpcomponents:httpcore:4.4.13" + implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1" + implementation "com.fasterxml.jackson.core:jackson-core:2.14.1" + runtimeOnly library.java.hadoop_client + runtimeOnly library.java.bigdataoss_gcs_connector + testImplementation project(path: ":runners:direct-java", configuration: "shadow") + testImplementation project(":sdks:java:io:google-cloud-platform") + testImplementation project(":sdks:java:extensions:ml") + testImplementation library.java.google_cloud_bigquery + testImplementation library.java.hamcrest + testImplementation library.java.junit + testImplementation library.java.mockito_core + testImplementation library.java.testcontainers_gcloud + + // Add dependencies for the PreCommit configurations + // For each runner a project level dependency on the examples project. + for (String runner : preCommitRunners) { + delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntimeMigration")) + } + directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") + flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}") + sparkRunnerPreCommit project(":runners:spark:3") + sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system") + + // Add dependency if requested on command line for runner + if (project.hasProperty("runnerDependency")) { + runtimeOnly project(path: project.getProperty("runnerDependency")) + } +} + +/* + * Create a ${runner}PreCommit task for each runner which runs a set + * of integration tests for WordCount and WindowedWordCount. + */ +def preCommitRunnerClass = [ + directRunner: "org.apache.beam.runners.direct.DirectRunner", + flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner", + sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner", +] +def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' +def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' + +for (String runner : preCommitRunners) { + tasks.create(name: runner + "PreCommit", type: Test) { + def preCommitBeamTestPipelineOptions = [ + "--project=${gcpProject}", + "--tempRoot=${gcsTempRoot}", + "--runner=" + preCommitRunnerClass[runner], + ] + classpath = configurations."${runner}PreCommit" + forkEvery 1 + maxParallelForks 4 + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions) + } +} + +/* Define a common precommit task which depends on all the individual precommits. */ +task preCommit() { + for (String runner : preCommitRunners) { + dependsOn runner + "PreCommit" + } +} + +tasks.create(name:"execute", type:JavaExec) { + main = project.hasProperty("mainClass") ? project.getProperty("mainClass") : "NONE" + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args project.hasProperty("exec.args") ? project.getProperty("exec.args").split() : [] +} \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java rename to examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java index 861b35b463e1..d50490c91b03 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SchemaTransformExample.java +++ b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples; +package org.apache.beam.examples.sql; // beam-playground: // name: Group.ByFields diff --git a/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java similarity index 98% rename from examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java rename to examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java index ec563783258f..1f8ae1973b4e 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SqlTransformExample.java +++ b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples; +package org.apache.beam.examples.sql; // beam-playground: // name: SqlTransform @@ -24,7 +24,7 @@ // default_example: false // context_line: 60 // categories: -// - SQL +// - Beam SQL // - Combiners // complexity: BASIC // tags: diff --git a/settings.gradle.kts b/settings.gradle.kts index 9bf86c9bf2f1..9dd77f9e18f4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -78,6 +78,7 @@ include(":examples:java:cdap:hubspot") include(":examples:java:cdap:salesforce") include(":examples:java:cdap:servicenow") include(":examples:java:cdap:zendesk") +include(":examples:java:sql") include(":examples:java:webapis") include(":examples:kotlin") include(":examples:multi-language") From a13cd535d2bba787896ab750a86d72d92e4a291d Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 8 Apr 2025 16:47:01 +0000 Subject: [PATCH 11/13] add README.md for SQL & Schema tranform examples --- examples/java/sql/README.md | 51 +++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 examples/java/sql/README.md diff --git a/examples/java/sql/README.md b/examples/java/sql/README.md new file mode 100644 index 000000000000..d27b8ce7025c --- /dev/null +++ b/examples/java/sql/README.md @@ -0,0 +1,51 @@ + + +# Example Pipelines for Beam SQL and Schema Transforms + +The examples included in this module serve to demonstrate the basic +functionality of Apache Beam SQL, and act as starting points for +the development of more complex pipelines. + +## SQL transform + +An example that leverage the powerful SQL syntax in Beam [SqlTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/sql/SqlTransform.html) directly in your Beam pipelines. + + +[`SqlTransformExample`](https://github.com/apache/beam/blob/master/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java) is a simple pipeline that calculates multiple metrics per key : Min, Max and Sum. + + +## Schema transform + +[Beam Schemas](https://beam.apache.org/documentation/programming-guide/#schemas) offer a flexible way of writing in code the same operations that are so easy to express in SQL. + +[`SchemaTransformExample`](https://github.com/apache/beam/blob/master/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java) is a simple pipeline that calculates multiple metrics per key : Min, Max and Sum. + +## Running Examples + +See [Apache Beam WordCount Example](https://beam.apache.org/get-started/wordcount-example/) for information on running these examples. + +Gradle cmd line can be similar to: + +`./gradlew clean :examples:java:sql:execute --args="--runner=DirectRunner" -Pdirect-runner -PmainClass=org.apache.beam.examples.sql.SqlTransformExample` + +## Beyond SQL and Schemas + +Both SQL and Schema Transforms leverage Row type. +The same results can be achieved directly using Beam transforms on a KV input PCollection. See [Composed Cobiners example](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java). From e772344d44e7f030021d795cb29f3486c3075e42 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 8 Apr 2025 16:51:33 +0000 Subject: [PATCH 12/13] adding subproject to build.gradle.kts to include the build in precommit test --- build.gradle.kts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index 8dcdc14f04e7..a665d04b527e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -244,6 +244,8 @@ tasks.register("javaPreCommit") { dependsOn(":beam-validate-runner:build") dependsOn(":examples:java:build") dependsOn(":examples:java:preCommit") + dependsOn(":examples:java:sql:build") + dependsOn(":examples:java:sql:preCommit") dependsOn(":examples:java:twitter:build") dependsOn(":examples:java:twitter:preCommit") dependsOn(":examples:multi-language:build") From 80f46c4fd5aba85cc3a52f5b32b86cccdd77f103 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 8 Apr 2025 19:28:32 +0000 Subject: [PATCH 13/13] gradle dependency scrub --- examples/java/sql/build.gradle | 40 ---------------------------------- 1 file changed, 40 deletions(-) diff --git a/examples/java/sql/build.gradle b/examples/java/sql/build.gradle index 3209277dd25f..466e2d0f429d 100644 --- a/examples/java/sql/build.gradle +++ b/examples/java/sql/build.gradle @@ -53,53 +53,13 @@ configurations.sparkRunnerPreCommit { dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) - implementation library.java.vendored_guava_32_1_2_jre - if (project.findProperty('testJavaVersion') == '21' || JavaVersion.current().compareTo(JavaVersion.VERSION_21) >= 0) { - // this dependency is a provided dependency for kafka-avro-serializer. It is not needed to compile with Java<=17 - // but needed for compile only under Java21, specifically, required for extending from AbstractKafkaAvroDeserializer - compileOnly library.java.kafka - } - implementation library.java.kafka_clients implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:extensions:google-cloud-platform-core") - implementation project(":sdks:java:extensions:python") implementation project(":sdks:java:extensions:sql") - implementation project(":sdks:java:io:google-cloud-platform") - implementation project(":sdks:java:managed") - implementation project(":sdks:java:extensions:ml") - implementation library.java.bigdataoss_util - implementation library.java.google_api_client - implementation library.java.google_api_services_bigquery - implementation library.java.google_api_services_pubsub - implementation library.java.google_auth_library_credentials - implementation library.java.google_auth_library_oauth2_http - implementation library.java.google_cloud_datastore_v1_proto_client - implementation library.java.google_code_gson - implementation library.java.google_http_client - implementation library.java.google_oauth_client - implementation library.java.jackson_databind - implementation library.java.joda_time - implementation library.java.protobuf_java - implementation library.java.proto_google_cloud_bigtable_v2 - implementation library.java.proto_google_cloud_datastore_v1 implementation library.java.slf4j_api - implementation library.java.commons_io - implementation library.java.commons_csv runtimeOnly project(path: ":runners:direct-java", configuration: "shadow") - implementation library.java.vendored_grpc_1_69_0 - implementation library.java.vendored_guava_32_1_2_jre - implementation "com.google.api.grpc:proto-google-cloud-language-v1:1.81.4" - implementation "org.apache.commons:commons-lang3:3.9" - implementation "org.apache.httpcomponents:httpclient:4.5.13" - implementation "org.apache.httpcomponents:httpcore:4.4.13" - implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1" - implementation "com.fasterxml.jackson.core:jackson-core:2.14.1" runtimeOnly library.java.hadoop_client runtimeOnly library.java.bigdataoss_gcs_connector testImplementation project(path: ":runners:direct-java", configuration: "shadow") - testImplementation project(":sdks:java:io:google-cloud-platform") - testImplementation project(":sdks:java:extensions:ml") - testImplementation library.java.google_cloud_bigquery testImplementation library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core