diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 2c1b59c9ad59..0d0d18c78b32 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" ) @@ -43,6 +44,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { // DoFns must be registered with beam. func init() { beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } // [END model_pardo_pardo] @@ -78,9 +84,8 @@ func splitStringPair(e stringPair) (string, string) { } func init() { - // Register element types and DoFns. - beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem()) - beam.RegisterFunction(splitStringPair) + // Register DoFn. + register.Function1x2(splitStringPair) } // CreateAndSplit is a helper function that creates @@ -202,7 +207,9 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str } func init() { - beam.RegisterFunction(formatCoGBKResults) + register.Function3x1(formatCoGBKResults) + // 1 input of type string => Iter1[string] + register.Iter1[string]() } // [END cogroupbykey_output_helpers] @@ -256,7 +263,7 @@ func sumInts(a, v int) int { } func init() { - beam.RegisterFunction(sumInts) + register.Function2x1(sumInts) } func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection { @@ -276,7 +283,7 @@ func (fn *boundedSum) MergeAccumulators(a, v int) int { } func init() { - beam.RegisterType(reflect.TypeOf((*boundedSum)(nil))) + register.Combiner1[int](&boundedSum{}) } func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection { @@ -313,7 +320,7 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 { } func init() { - beam.RegisterType(reflect.TypeOf((*averageFn)(nil))) + register.Combiner3[averageAccum, int, float64](&averageFn{}) } // [END combine_custom_average] @@ -370,7 +377,7 @@ func decileFn(student Student) int { } func init() { - beam.RegisterFunction(decileFn) + register.Function1x1(decileFn) } // [END model_multiple_pcollections_partition_fn] @@ -417,8 +424,12 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st } func init() { - beam.RegisterFunction(filterWordsAbove) - beam.RegisterFunction(filterWordsBelow) + register.Function3x1(filterWordsAbove) + register.Function3x0(filterWordsBelow) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() + // 1 input of type float64 => Iter1[float64] + register.Iter1[float64]() } // [END model_pardo_side_input_dofn] @@ -470,8 +481,10 @@ func processWordsMixed(word string, emitMarked func(string)) int { } func init() { - beam.RegisterFunction(processWords) - beam.RegisterFunction(processWordsMixed) + register.Function4x0(processWords) + register.Function2x1(processWordsMixed) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() } // [END model_multiple_output_dofn] @@ -515,7 +528,9 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { // [END model_paneinfo] func init() { - beam.RegisterFunction(extractWordsFn) + register.Function3x0(extractWordsFn) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() } // [START countwords_composite] diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index d4b822d0f27a..5653e16dbf15 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -432,12 +432,6 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed. -{{< paragraph class="language-go" >}} -Custom struct types should be registered with beam using `beam.RegisterType`. -Among other things, this allows the Go SDK to infer an encoding from their -exported fields. Unexported fields in struct types are ignored. -{{< /paragraph >}} - #### 3.2.2. Element schema {#element-schema} In many cases, the element type in a `PCollection` has a structure that can introspected. @@ -704,6 +698,31 @@ processing function. +{{< paragraph class="language-go" >}} +All DoFns should be registered using a generic `register.DoFnXxY[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the DoFn for execution on remote runners, and optimizes the runtime +execution of the DoFns via reflection. +{{< /paragraph >}} + +{{< highlight go >}} +// ComputeWordLengthFn is a DoFn that computes the word length of string elements. +type ComputeWordLengthFn struct{} + +// ProcessElement computes the length of word and emits the result. +// When creating structs as a DoFn, the ProcessElement method performs the +// work of this step in the pipeline. +func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) int { + ... +} + +func init() { + // 2 inputs and 1 output => DoFn2x1 + // Input/output types are included in order in the brackets + register.DoFn2x1[context.Context, string, int](&ComputeWordLengthFn{}) +} +{{< /highlight >}} + ##### 4.2.1.1. Applying ParDo {#applying-pardo} {{< paragraph class="language-java language-py" >}} @@ -799,7 +818,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { } func init() { - beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.Function2x0(&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -862,7 +885,11 @@ Simple DoFns can also be written as functions. func ComputeWordLengthFn(word string, emit func(int)) { ... } func init() { - beam.RegisterFunction(ComputeWordLengthFn) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -1027,7 +1054,7 @@ var words beam.PCollection = ... > **Note:** Anonymous function DoFns may not work on distributed runners. -> It's recommended to use named functions and register them with `beam.RegisterFunction` in +> It's recommended to use named functions and register them with `register.FunctionXxY` in > an `init()` block. @@ -1329,6 +1356,26 @@ public static class SumInts implements SerializableFunction, I {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" combine_bounded_sum >}} {{< /highlight >}} +{{< paragraph class="language-go" >}} +All Combiners should be registered using a generic `register.CombinerX[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the Combiner for execution on remote runners, and optimizes the runtime +execution of the Combiner via reflection. + +Combiner1 should be used when your accumulator, input, and output are all of the +same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T` +is the type of the input/accumulator/output. + +Combiner2 should be used when your accumulator, input, and output are 2 distinct +types. It can be called with `register.Combiner2[T1, T2](&CustomCombiner{})` where +`T1` is the type of the accumulator and `T2` is the other type. + +Combiner3 should be used when your accumulator, input, and output are 3 distinct +types. It can be called with `register.Combiner3[T1, T2, T3](&CustomCombiner{})` +where `T1` is the type of the accumulator, `T2` is the type of the input, and `T3` is +the type of the output. +{{< /paragraph >}} + {{< highlight go >}} {{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}} {{< /highlight >}} @@ -1710,9 +1757,10 @@ a remote worker in your processing cluster. as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`; however, your subclass must not add any non-serializable members. Funcs are serializable as long as -they are registered with `beam.RegisterFunction`, and are not -closures. Structural `DoFn`s will have all exported fields serialized. -Unexported fields are unable to be serialized, and will be silently ignored. +they are registered with `register.FunctionXxY` (for simple functions) or +`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural +`DoFn`s will have all exported fields serialized. Unexported fields are unable to +be serialized, and will be silently ignored. Some other serializability factors you should keep in mind are: @@ -1773,6 +1821,10 @@ processing each element in the input `PCollection`, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. +{{< paragraph class="language-go" >}} +All side input iterables should be registered using a generic `register.IterX[...]` +function. This optimizes runtime execution of the iterable. +{{< /paragraph >}} #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} @@ -1964,6 +2016,9 @@ multiple output PCollections. Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. + +All emitters should be registered using a generic `register.EmitterX[...]` +function. This optimizes runtime execution of the emitter. {{< /paragraph >}} {{< paragraph class="language-go" >}}