diff --git a/CHANGES.md b/CHANGES.md index ae674ead6e53..10b891d12592 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Go SDK users can now use generic registration functions to optimize their DoFn execution. ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)) * Go SDK users may now write self-checkpointing Splittable DoFns to read from streaming sources. ([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104)) ## Breaking Changes diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 80ff7ed66aae..f0dd1e780978 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" ) @@ -41,7 +42,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { // DoFns must be registered with beam. func init() { - beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } // [END model_pardo_pardo] @@ -185,6 +190,8 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str func init() { beam.RegisterFunction(formatCoGBKResults) + // 1 input of type string => Iter1[string] + register.Iter1[string]() } // [END cogroupbykey_output_helpers] @@ -258,7 +265,7 @@ func (fn *boundedSum) MergeAccumulators(a, v int) int { } func init() { - beam.RegisterType(reflect.TypeOf((*boundedSum)(nil))) + register.Combiner1[int](&boundedSum{}) } func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection { @@ -295,7 +302,7 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 { } func init() { - beam.RegisterType(reflect.TypeOf((*averageFn)(nil))) + register.Combiner3[averageAccum, int, float64](&averageFn{}) } // [END combine_custom_average] @@ -401,6 +408,10 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st func init() { beam.RegisterFunction(filterWordsAbove) beam.RegisterFunction(filterWordsBelow) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() + // 1 input of type float64 => Iter1[float64] + register.Iter1[float64]() } // [END model_pardo_side_input_dofn] @@ -454,6 +465,8 @@ func processWordsMixed(word string, emitMarked func(string)) int { func init() { beam.RegisterFunction(processWords) beam.RegisterFunction(processWordsMixed) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() } // [END model_multiple_output_dofn] @@ -498,6 +511,8 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { func init() { beam.RegisterFunction(extractWordsFn) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() } // [START countwords_composite] diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index d0436def2f68..64dcdc3d7b5e 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -432,12 +432,6 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed. -{{< paragraph class="language-go" >}} -Custom struct types should be registered with beam using `beam.RegisterType`. -Among other things, this allows the Go SDK to infer an encoding from their -exported fields. Unexported fields in struct types are ignored. -{{< /paragraph >}} - #### 3.2.2. Element schema {#element-schema} In many cases, the element type in a `PCollection` has a structure that can introspected. @@ -704,6 +698,31 @@ processing function. +{{< paragraph class="language-go" >}} +All DoFns should be registered using a generic `register.DoFnXxY[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the DoFn for execution on remote runners, and optimizes the runtime +execution of the DoFns via reflection. +{{< /paragraph >}} + +{{< highlight go >}} +// ComputeWordLengthFn is a DoFn that computes the word length of string elements. +type ComputeWordLengthFn struct{} + +// ProcessElement computes the length of word and emits the result. +// When creating structs as a DoFn, the ProcessElement method performs the +// work of this step in the pipeline. +func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) int { + ... +} + +func init() { + // 2 inputs and 1 output => DoFn2x1 + // Input/output types are included in order in the brackets + register.DoFn2x1[context.Context, string, int](&ComputeWordLengthFn{}) +} +{{< /highlight >}} + ##### 4.2.1.1. Applying ParDo {#applying-pardo} {{< paragraph class="language-java language-py" >}} @@ -799,7 +818,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { } func init() { - beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -862,7 +885,11 @@ Simple DoFns can also be written as functions. func ComputeWordLengthFn(word string, emit func(int)) { ... } func init() { - beam.RegisterFunction(ComputeWordLengthFn) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -1329,6 +1356,26 @@ public static class SumInts implements SerializableFunction, I {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" combine_bounded_sum >}} {{< /highlight >}} +{{< paragraph class="language-go" >}} +All Combiners should be registered using a generic `register.CombinerX[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the Combiner for execution on remote runners, and optimizes the runtime +execution of the Combiner via reflection. + +Combiner1 should be used when your accumulator, input, and output are all of the +same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T` +is the type of the input/accumulator/output. + +Combiner2 should be used when your accumulator, input, and output are 2 distinct +types. It can be called with `register.Combiner2[T1, T2]`(&CustomCombiner{}) where +`T1` is the type of the accumulator and `T2` is the other type. + +Combiner3 should be used when your accumulator, input, and output are 3 distinct +types. It can be called with `register.Combiner3[T1, T2, T3](&CustomCombiner{})` +where `T1` is the type of the accumulator, `T2` is the type of the input, and `T3` is +the type of the output. +{{< /paragraph >}} + {{< highlight go >}} {{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}} {{< /highlight >}} @@ -1710,9 +1757,10 @@ a remote worker in your processing cluster. as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`; however, your subclass must not add any non-serializable members. Funcs are serializable as long as -they are registered with `beam.RegisterFunction`, and are not -closures. Structural `DoFn`s will have all exported fields serialized. -Unexported fields are unable to be serialized, and will be silently ignored. +they are registered with `beam.RegisterFunction` (for simple functions) or +`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural +`DoFn`s will have all exported fields serialized. Unexported fields are unable to +be serialized, and will be silently ignored. Some other serializability factors you should keep in mind are: @@ -1773,6 +1821,10 @@ processing each element in the input `PCollection`, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. +{{< paragraph class="language-go" >}} +All side input iters should be registered using a generic `register.IterX[...]` +function. This optimizes runtime execution of the iter. +{{< /paragraph >}} #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} @@ -1964,6 +2016,9 @@ multiple output PCollections. Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. + +All emitters should be registered using a generic `register.DoFnXxY[...]` +function. This optimizes runtime execution of the emitter. {{< /paragraph >}} {{< paragraph class="language-go" >}}