Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)

Expand All @@ -43,6 +44,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) {
// DoFns must be registered with beam.
func init() {
beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil)))
// 2 inputs and 0 outputs => DoFn2x0
// 1 input => Emitter1
// Input/output types are included in order in the brackets
register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{})
register.Emitter1[int]()
}

// [END model_pardo_pardo]
Expand Down Expand Up @@ -78,9 +84,8 @@ func splitStringPair(e stringPair) (string, string) {
}

func init() {
// Register element types and DoFns.
beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem())
beam.RegisterFunction(splitStringPair)
// Register DoFn.
register.Function1x2(splitStringPair)
}

// CreateAndSplit is a helper function that creates
Expand Down Expand Up @@ -202,7 +207,9 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str
}

func init() {
beam.RegisterFunction(formatCoGBKResults)
register.Function3x1(formatCoGBKResults)
// 1 input of type string => Iter1[string]
register.Iter1[string]()
}

// [END cogroupbykey_output_helpers]
Expand Down Expand Up @@ -256,7 +263,7 @@ func sumInts(a, v int) int {
}

func init() {
beam.RegisterFunction(sumInts)
register.Function2x1(sumInts)
}

func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection {
Expand All @@ -276,7 +283,7 @@ func (fn *boundedSum) MergeAccumulators(a, v int) int {
}

func init() {
beam.RegisterType(reflect.TypeOf((*boundedSum)(nil)))
register.Combiner1[int](&boundedSum{})
}

func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection {
Expand Down Expand Up @@ -313,7 +320,7 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 {
}

func init() {
beam.RegisterType(reflect.TypeOf((*averageFn)(nil)))
register.Combiner3[averageAccum, int, float64](&averageFn{})
}

// [END combine_custom_average]
Expand Down Expand Up @@ -370,7 +377,7 @@ func decileFn(student Student) int {
}

func init() {
beam.RegisterFunction(decileFn)
register.Function1x1(decileFn)
}

// [END model_multiple_pcollections_partition_fn]
Expand Down Expand Up @@ -417,8 +424,12 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st
}

func init() {
beam.RegisterFunction(filterWordsAbove)
beam.RegisterFunction(filterWordsBelow)
register.Function3x1(filterWordsAbove)
register.Function3x0(filterWordsBelow)
// 1 input of type string => Emitter1[string]
register.Emitter1[string]()
// 1 input of type float64 => Iter1[float64]
register.Iter1[float64]()
}

// [END model_pardo_side_input_dofn]
Expand Down Expand Up @@ -470,8 +481,10 @@ func processWordsMixed(word string, emitMarked func(string)) int {
}

func init() {
beam.RegisterFunction(processWords)
beam.RegisterFunction(processWordsMixed)
register.Function4x0(processWords)
register.Function2x1(processWordsMixed)
// 1 input of type string => Emitter1[string]
register.Emitter1[string]()
}

// [END model_multiple_output_dofn]
Expand Down Expand Up @@ -515,7 +528,9 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) {
// [END model_paneinfo]

func init() {
beam.RegisterFunction(extractWordsFn)
register.Function3x0(extractWordsFn)
// 1 input of type string => Emitter1[string]
register.Emitter1[string]()
}

// [START countwords_composite]
Expand Down
79 changes: 67 additions & 12 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,6 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism
that includes built-in encoding for commonly-used types as well as support for
specifying custom encodings as needed.

{{< paragraph class="language-go" >}}
Custom struct types should be registered with beam using `beam.RegisterType`.
Among other things, this allows the Go SDK to infer an encoding from their
exported fields. Unexported fields in struct types are ignored.
{{< /paragraph >}}

#### 3.2.2. Element schema {#element-schema}

In many cases, the element type in a `PCollection` has a structure that can introspected.
Expand Down Expand Up @@ -704,6 +698,31 @@ processing function.

</span>

{{< paragraph class="language-go" >}}
All DoFns should be registered using a generic `register.DoFnXxY[...]`
function. This allows the Go SDK to infer an encoding from any inputs/outputs,
registers the DoFn for execution on remote runners, and optimizes the runtime
execution of the DoFns via reflection.
{{< /paragraph >}}

{{< highlight go >}}
// ComputeWordLengthFn is a DoFn that computes the word length of string elements.
type ComputeWordLengthFn struct{}

// ProcessElement computes the length of word and emits the result.
// When creating structs as a DoFn, the ProcessElement method performs the
// work of this step in the pipeline.
func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) int {
...
}

func init() {
// 2 inputs and 1 output => DoFn2x1
// Input/output types are included in order in the brackets
register.DoFn2x1[context.Context, string, int](&ComputeWordLengthFn{})
}
{{< /highlight >}}

##### 4.2.1.1. Applying ParDo {#applying-pardo}

{{< paragraph class="language-java language-py" >}}
Expand Down Expand Up @@ -799,7 +818,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) {
}

func init() {
beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil)))
// 2 inputs and 0 outputs => DoFn2x0
// 1 input => Emitter1
// Input/output types are included in order in the brackets
register.Function2x0(&ComputeWordLengthFn{})
register.Emitter1[int]()
}
{{< /highlight >}}

Expand Down Expand Up @@ -862,7 +885,11 @@ Simple DoFns can also be written as functions.
func ComputeWordLengthFn(word string, emit func(int)) { ... }

func init() {
beam.RegisterFunction(ComputeWordLengthFn)
// 2 inputs and 0 outputs => DoFn2x0
// 1 input => Emitter1
// Input/output types are included in order in the brackets
register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{})
register.Emitter1[int]()
}
{{< /highlight >}}

Expand Down Expand Up @@ -1027,7 +1054,7 @@ var words beam.PCollection = ...
<span class="language-go" >

> **Note:** Anonymous function DoFns may not work on distributed runners.
> It's recommended to use named functions and register them with `beam.RegisterFunction` in
> It's recommended to use named functions and register them with `register.FunctionXxY` in
> an `init()` block.

</span>
Expand Down Expand Up @@ -1329,6 +1356,26 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>, I
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" combine_bounded_sum >}}
{{< /highlight >}}

{{< paragraph class="language-go" >}}
All Combiners should be registered using a generic `register.CombinerX[...]`
function. This allows the Go SDK to infer an encoding from any inputs/outputs,
registers the Combiner for execution on remote runners, and optimizes the runtime
execution of the Combiner via reflection.

Combiner1 should be used when your accumulator, input, and output are all of the
same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T`
is the type of the input/accumulator/output.

Combiner2 should be used when your accumulator, input, and output are 2 distinct
types. It can be called with `register.Combiner2[T1, T2](&CustomCombiner{})` where
`T1` is the type of the accumulator and `T2` is the other type.

Combiner3 should be used when your accumulator, input, and output are 3 distinct
types. It can be called with `register.Combiner3[T1, T2, T3](&CustomCombiner{})`
where `T1` is the type of the accumulator, `T2` is the type of the input, and `T3` is
the type of the output.
{{< /paragraph >}}

{{< highlight go >}}
{{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}}
{{< /highlight >}}
Expand Down Expand Up @@ -1710,9 +1757,10 @@ a remote worker in your processing cluster.
as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`;
however, your subclass must not add any non-serializable members.</span>
<span class="language-go">Funcs are serializable as long as
they are registered with `beam.RegisterFunction`, and are not
closures. Structural `DoFn`s will have all exported fields serialized.
Unexported fields are unable to be serialized, and will be silently ignored.</span>
they are registered with `register.FunctionXxY` (for simple functions) or
`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural
`DoFn`s will have all exported fields serialized. Unexported fields are unable to
be serialized, and will be silently ignored.</span>

Some other serializability factors you should keep in mind are:

Expand Down Expand Up @@ -1773,6 +1821,10 @@ processing each element in the input `PCollection`, but the additional data
needs to be determined at runtime (and not hard-coded). Such values might be
determined by the input data, or depend on a different branch of your pipeline.

{{< paragraph class="language-go" >}}
All side input iterables should be registered using a generic `register.IterX[...]`
function. This optimizes runtime execution of the iterable.
{{< /paragraph >}}

#### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo}

Expand Down Expand Up @@ -1964,6 +2016,9 @@ multiple output PCollections.
Call emitter functions as needed to produce 0 or more elements for its matching
`PCollection`. The same value can be emitted with multiple emitters.
As normal, do not mutate values after emitting them from any emitter.

All emitters should be registered using a generic `register.EmitterX[...]`
function. This optimizes runtime execution of the emitter.
{{< /paragraph >}}

{{< paragraph class="language-go" >}}
Expand Down