diff --git a/learning/tour-of-beam/learning-content/go/content-info.yaml b/learning/tour-of-beam/learning-content/go/content-info.yaml new file mode 100644 index 000000000000..8b75fa8771d2 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/content-info.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: Go +content: + - introduction \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/description.md b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/description.md new file mode 100644 index 000000000000..0a990668241e --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/description.md @@ -0,0 +1,56 @@ + +### Creating PCollection + +Now that you know how to create a Beam pipeline and pass parameters into it, it is time to learn how to create an initial `PCollection` and fill it with data. + +There are several options: + +→ You can create a PCollection of data stored in an in-memory collection class in your driver program. + +→ You can also read the data from a variety of external sources such as local or cloud-based files, databases, or other sources using Beam-provided I/O adapters + +Through the tour, most of the examples use either a `PCollection` created from in-memory data or data read from one of the cloud buckets "beam-examples" or "dataflow-samples". These buckets contain sample data sets specifically created for educational purposes. + +We encourage you to take a look, explore these data sets and use them while learning Apache Beam. + +### Creating a PCollection from in-memory data + +You can use the Beam-provided Create transform to create a `PCollection` from an in-memory Go Collection. You can apply Create transform directly to your Pipeline object itself. + +The following example code shows how to do this: + +``` +func main() { + ctx := context.Background() + + // First create pipeline + p, s := beam.NewPipelineWithRoot() + + //Now create the PCollection using list of strings + strings := beam.Create(s, "To", "be", "or", "not", "to", "be","that", "is", "the", "question") + + //Create a numerical PCollection + numbers := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + +} +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. + +One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. Don’t worry if you don’t quite understand it, as the concept of `ParDo` transform will be explained later in the course. Feel free, however, to use it in exercises and challenges to explore results. + +Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes. \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/example/from_memory.go b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/example/from_memory.go new file mode 100644 index 000000000000..ef8cccb3bc19 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/example/from_memory.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: ParDo +// description: ParDo example. +// multifile: false +// context_line: 32 +// categories: +// - Quickstart +// complexity: BASIC +// tags: +// - hellobeam + +package main + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "fmt" +) + +func main() { + p, s := beam.NewPipelineWithRoot() + + words := beam.Create(s, "Hello", "world", "it`s", "Beam") + + output(s, words) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} + +func output(s beam.Scope, input beam.PCollection) { + beam.ParDo0(s, func(element interface{}) { + fmt.Println(element) + }, input) +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/unit-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/unit-info.yaml new file mode 100644 index 000000000000..d26548d6cda3 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/from-memory/unit-info.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: from-memory +name: Creating in-memory PCollections +taskName: ParDo \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/group-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/group-info.yaml new file mode 100644 index 000000000000..418ed516f2b0 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/group-info.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: creating-collections +name: Creating Collections +content: +- from-memory +- reading-from-text +- reading-from-csv \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/description.md b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/description.md new file mode 100644 index 000000000000..fd3c6fd460eb --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/description.md @@ -0,0 +1,35 @@ + + +### Read from csv file + +Data processing pipelines often work with tabular data. In many examples and challenges throughout the course, you’ll be working with one of the datasets stored as csv files in either beam-examples, dataflow-samples buckets. + +Loading data from csv file requires some processing and consists of two main part: +* Loading text lines using `TextIO.Read` transform +* Parsing lines of text into tabular format + +### Playground exercise + +Try to experiment with an example in the playground window and modify the code to process other fields from New York taxi rides dataset. + +Here is a small list of fields and an example record from this dataset: + +| cost | passenger_count | ... | +|------|-----------------|-----| +| 5.8 | 1 | ... | +| 4.6 | 2 | ... | +| 24 | 1 | ... | + +Overview [file](https://storage.googleapis.com/apache-beam-samples/nyc_taxi/misc/sample1000.csv) diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/example/csvExample.go b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/example/csvExample.go new file mode 100644 index 000000000000..eeaedbb1ec51 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/example/csvExample.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: CSV +// description: CSV example. +// multifile: false +// context_line: 44 +// categories: +// - Quickstart +// complexity: BASIC +// tags: +// - hellobeam + +package main + +import ( + "context" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "strconv" + "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + +) + +func less(a, b float64) bool{ + return a>b +} + +func main() { + p, s := beam.NewPipelineWithRoot() + + file := Read(s, "gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv") + + cost := applyTransform(s, file) + + fixedSizeElements := top.Largest(s,cost,10,less) + + output(s, "Total cost: ", fixedSizeElements) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} + +// Read reads from fiename(s) specified by a glob string and a returns a PCollection. +func Read(s beam.Scope, glob string) beam.PCollection { + return textio.Read(s, glob) +} + +// ApplyTransform converts to uppercase all elements in a PCollection. +func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return beam.ParDo(s, func(line string) float64 { + taxi := strings.Split(strings.TrimSpace(line), ",") + if len(taxi) > 16 { + cost, _ := strconv.ParseFloat(taxi[16],64) + return cost + } + return 0.0 + }, input) +} + +func output(s beam.Scope, prefix string, input beam.PCollection) { + beam.ParDo0(s, func(elements []float64) { + for _, element := range elements { + fmt.Println(prefix,element) + } + }, input) +} diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/unit-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/unit-info.yaml new file mode 100644 index 000000000000..3d12a4dc6532 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-csv/unit-info.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: from-csv +name: Creating PCollections from csv files +taskName: CSV \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/description.md b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/description.md new file mode 100644 index 000000000000..4968aa7d9528 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/description.md @@ -0,0 +1,41 @@ + +### Reading from text file + +You use one of the Beam-provided I/O adapters to read from an external source. The adapters vary in their exact usage, but all of them read from some external data source and return a `PCollection` whose elements represent the data records in that source. + +Each data source adapter has a Read transform; to read, you must apply that transform to the Pipeline object itself. + +`TextIO.Read` , for example, reads from an external text file and returns a `PCollection` whose elements are of type String. Each String represents one line from the text file. Here’s how you would apply `TextIO.Read` to your Pipeline to create a `PCollection`: + +``` +func main() { + ctx := context.Background() + + // First create pipline + p, s := beam.NewPipelineWithRoot() + + // Now create the PCollection by reading text files. Separate elements will be added for each line in the input file + lines := textio.Read(scope, 'gs://some/inputData.txt') + +} +``` + +### Playground exercise + +In the playground window, you can find an example that reads a king lear poem from the text file stored in the Google Storage bucket and fills PCollection with individual lines and then with individual words. Try it out and see what the output is. + +One of the differences you will see is that the output is much shorter than the input file itself. This is because the number of elements in the output `PCollection` is limited with the `top.Largest(s,lines,10,less)` transform. Use Sample.fixedSizeGlobally transform of is another technique you can use to troubleshoot and limit the output sent to the console for debugging purposes in case of large input datasets. + +Overview [file](https://storage.googleapis.com/apache-beam-samples/shakespeare/kinglear.txt) \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/example/textIo.go b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/example/textIo.go new file mode 100644 index 000000000000..a88249e6a0cb --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/example/textIo.go @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +// beam-playground: +// name: TextIO +// description: TextIO example. +// multifile: false +// context_line: 46 +// categories: +// - Quickstart +// complexity: BASIC +// tags: +// - hellobeam + +package main + +import ( + "context" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "regexp" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" +) + +var ( + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) +) + +func less(a, b string) bool{ + return len(a)>len(b) +} + +func main() { + p, s := beam.NewPipelineWithRoot() + + file := Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") + + lines := getLines(s, file) + fixedSizeLines := top.Largest(s,lines,10,less) + output(s, "Lines: ", fixedSizeLines) + + words := getWords(s,lines) + fixedSizeWords := top.Largest(s,words,10,less) + output(s, "Words: ", fixedSizeWords) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} + +// Read reads from fiename(s) specified by a glob string and a returns a PCollection. +func Read(s beam.Scope, glob string) beam.PCollection { + return textio.Read(s, glob) +} + +// Read text file content line by line. resulting PCollection contains elements, where each element contains a single line of text from the input file. +func getLines(s beam.Scope, input beam.PCollection) beam.PCollection { + return filter.Include(s, input, func(element string) bool { + return element != "" + }) +} + +// getWords read text lines and split into PCollection of words. +func getWords(s beam.Scope, input beam.PCollection) beam.PCollection { + return beam.ParDo(s, func(line string, emit func(string)) { + for _, word := range wordRE.FindAllString(line, -1) { + emit(word) + } + }, input) +} + +func output(s beam.Scope, prefix string, input beam.PCollection) { + beam.ParDo0(s, func(elements []string) { + for _, element := range elements { + fmt.Println(prefix,element) + } + }, input) +} diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/unit-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/unit-info.yaml new file mode 100644 index 000000000000..396a0190f74a --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/creating-collections/reading-from-text/unit-info.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: from-text +name: Creating PCollections from text files +taskName: TextIO \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/group-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/group-info.yaml new file mode 100644 index 000000000000..19ef8ae2fd5b --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/group-info.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: concepts +name: Beam Concepts +content: +- runner-concepts +- pipeline-concepts +- creating-collections \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md new file mode 100644 index 000000000000..3aa6c64978cd --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md @@ -0,0 +1,36 @@ + +### Creating a pipeline + +The `Pipeline` abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a Pipeline object, and then using that object as the basis for creating the pipeline’s data sets as PCollections and its operations as `Transforms`. + +To use Beam, your driver program must first create an instance of the Beam SDK class Pipeline (typically in the main() function). When you create your `Pipeline`, you’ll also need to set some configuration options. You can set your pipeline’s configuration options programmatically, but it’s often easier to set the options ahead of time (or read them from the command line) and pass them to the Pipeline object when you create the object. + +``` +// beam.Init() is an initialization hook that must be called +// near the beginning of main(), before creating a pipeline. +beam.Init() + +// Create the Pipeline object and root scope. +pipeline, scope := beam.NewPipelineWithRoot() +``` + +### Playground exercise + +You can find the full code of the above example in the playground window, which you can run and experiment with. And you can create a `pipeline`, `scope` separately, it is an alternative to `beam.NewPipelineWithRoot()`. It is convenient if manipulations are needed before creating an element. + +``` +pipeline := beam.NewPipeline() +scope := p.Root() +``` \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/example/main.go b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/example/main.go new file mode 100644 index 000000000000..05e8e0af767b --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/example/main.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: creating-pipeline +// description: Creating pipeline example. +// multifile: false +// context_line: 34 +// categories: +// - Quickstart +// complexity: BASIC +// tags: +// - hellobeam + +package main + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" +) + + + +func main() { + p, s := beam.NewPipelineWithRoot() + + hello := helloBeam(s) + debug.Print(s, hello) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} + +func helloBeam(s beam.Scope) beam.PCollection { + return beam.Create(s, "Hello Beam") +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/unit-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/unit-info.yaml new file mode 100644 index 000000000000..f3c47a5fe1ed --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/unit-info.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: creating-pipeline +name: Creating pipelines +taskName: creating-pipeline \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/group-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/group-info.yaml new file mode 100644 index 000000000000..dd46ad6e61ad --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/group-info.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: pipeline-concepts +name: Pipeline concepts +content: +- overview-pipeline +- creating-pipeline +- setting-pipeline \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/overview-pipeline/description.md b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/overview-pipeline/description.md new file mode 100644 index 000000000000..ce5cb7f69ff9 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/overview-pipeline/description.md @@ -0,0 +1,43 @@ + + +### Overview + +To use Beam, you first need to first create a driver program using the classes in one of the Beam SDKs. Your driver program defines your pipeline, including all of the inputs, transforms, and outputs. It also sets execution options for your pipeline (typically passed by using command-line options). These include the Pipeline Runner, which, in turn, determines what back-end your pipeline will run on. + +The Beam SDKs provide several abstractions that simplify the mechanics of large-scale distributed data processing. The same Beam abstractions work with both batch and streaming data sources. When you create your Beam pipeline, you can think about your data processing task in terms of these abstractions. They include: + +→ `Pipeline`: A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run. + +→ `PCollection`: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline. + +→ `PTransform`: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as the input, performs a processing function that you provide on the elements of that PCollection, and then produces zero or more output PCollection objects. + +→ `Scope`: The Go SDK has an explicit scope variable used to build a `Pipeline`. A Pipeline can return it’s root scope with the `Root()` method. The scope variable is then passed to `PTransform` functions that place them in the `Pipeline` that owns the `Scope`. + +→ `I/O transforms`: Beam comes with a number of “IOs” - library PTransforms that read or write data to various external storage systems. + +A typical Beam driver program works as follows: + +→ Create a Pipeline object and set the pipeline execution options, including the Pipeline Runner. + +→ Create an initial `PCollection` for pipeline data, either using the IOs to read data from an external storage system, or using a Create transform to build a `PCollection` from in-memory data. + +→ Apply `PTransforms` to each `PCollection`. Transforms can change, filter, group, analyze, or otherwise process the elements in a PCollection. A transform creates a new output PCollection without modifying the input collection. A typical pipeline applies subsequent transforms to each new output PCollection in turn until the processing is complete. However, note that a pipeline does not have to be a single straight line of transforms applied one after another: think of PCollections as variables and PTransforms as functions applied to these variables: the shape of the pipeline can be an arbitrarily complex processing graph. + +→ Use IOs to write the final, transformed PCollection(s) to an external source. + +→ Run the pipeline using the designated Pipeline Runner. + +When you run your Beam driver program, the Pipeline Runner that you designate constructs a workflow graph of your pipeline based on the PCollection objects you’ve created and the transforms that you’ve applied. That graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous “job” (or equivalent) on that back-end. \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/overview-pipeline/unit-info.yaml b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/overview-pipeline/unit-info.yaml new file mode 100644 index 000000000000..86b85b02ca47 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/overview-pipeline/unit-info.yaml @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +id: overview-pipeline +name: Pipeline overview \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/description.md b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/description.md new file mode 100644 index 000000000000..23781b1e5ea2 --- /dev/null +++ b/learning/tour-of-beam/learning-content/go/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/description.md @@ -0,0 +1,71 @@ + +### Configuring pipeline options + +Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. Your pipeline options will potentially include information such as your project ID or a location for storing files. + +### Setting PipelineOptions from command-line arguments + +Use Go flags to parse command line arguments to configure your pipeline. Flags must be parsed before `beam.Init()` is called. + +``` +// If beamx or Go flags are used, flags must be parsed first, +// before beam.Init() is called. +flag.Parse() +``` + +This interprets command-line arguments this follow the format: + +``` +--