From dd848b7ab83f1f1b4728aa37472debee55dade97 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 12 May 2022 10:11:29 -0400 Subject: [PATCH 1/6] Update docs to prefer generic registration functions --- sdks/go/examples/snippets/04transforms.go | 13 +++- .../en/documentation/programming-guide.md | 68 ++++++++++++++++--- 2 files changed, 67 insertions(+), 14 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 80ff7ed66aae..aedc8a1f9af7 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" ) @@ -41,7 +42,8 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { // DoFns must be registered with beam. func init() { - beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + register.DoFn[string, func(int)](&ComputeWordLengthFn{})) + register.Emitter1[int]() } // [END model_pardo_pardo] @@ -185,6 +187,7 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str func init() { beam.RegisterFunction(formatCoGBKResults) + register.Iter1[string]() } // [END cogroupbykey_output_helpers] @@ -258,7 +261,7 @@ func (fn *boundedSum) MergeAccumulators(a, v int) int { } func init() { - beam.RegisterType(reflect.TypeOf((*boundedSum)(nil))) + register.Combiner1[int](&boundedSum{}) } func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection { @@ -295,7 +298,7 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 { } func init() { - beam.RegisterType(reflect.TypeOf((*averageFn)(nil))) + register.Combiner3[averageAccum, int, float64](&averageFn{}) } // [END combine_custom_average] @@ -401,6 +404,8 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st func init() { beam.RegisterFunction(filterWordsAbove) beam.RegisterFunction(filterWordsBelow) + register.Emitter1[string]() + register.Iter1[float64]() } // [END model_pardo_side_input_dofn] @@ -454,6 +459,7 @@ func processWordsMixed(word string, emitMarked func(string)) int { func init() { beam.RegisterFunction(processWords) beam.RegisterFunction(processWordsMixed) + register.Emitter1[string]() } // [END model_multiple_output_dofn] @@ -498,6 +504,7 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { func init() { beam.RegisterFunction(extractWordsFn) + register.Emitter1[string]() } // [START countwords_composite] diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index d0436def2f68..855e8077390d 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -432,12 +432,6 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed. -{{< paragraph class="language-go" >}} -Custom struct types should be registered with beam using `beam.RegisterType`. -Among other things, this allows the Go SDK to infer an encoding from their -exported fields. Unexported fields in struct types are ignored. -{{< /paragraph >}} - #### 3.2.2. Element schema {#element-schema} In many cases, the element type in a `PCollection` has a structure that can introspected. @@ -704,6 +698,29 @@ processing function. +{{< paragraph class="language-go" >}} +All DoFns should be registered using the generic `register.DoFnXxY[...]` +This allows the Go SDK to infer an encoding from any inputs/outputs, registers +the DoFn for execution on remote runners, and optimizes the runtime execution of +the DoFns via reflection. +{{< /paragraph >}} + +{{< highlight go >}} +// ComputeWordLengthFn is a DoFn that computes the word length of string elements. +type ComputeWordLengthFn struct{} + +// ProcessElement computes the length of word and emits the result. +// When creating structs as a DoFn, the ProcessElement method performs the +// work of this step in the pipeline. +func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) int { + ... +} + +func init() { + register.DoFn2x1[string, context.Context, int](&ComputeWordLengthFn{}) +} +{{< /highlight >}} + ##### 4.2.1.1. Applying ParDo {#applying-pardo} {{< paragraph class="language-java language-py" >}} @@ -799,7 +816,8 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { } func init() { - beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -862,7 +880,8 @@ Simple DoFns can also be written as functions. func ComputeWordLengthFn(word string, emit func(int)) { ... } func init() { - beam.RegisterFunction(ComputeWordLengthFn) + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -1329,6 +1348,25 @@ public static class SumInts implements SerializableFunction, I {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" combine_bounded_sum >}} {{< /highlight >}} +{{< paragraph class="language-go" >}} +All Combiners should be registered using the generic `register.CombinerX[...]` +This allows the Go SDK to infer an encoding from any inputs/outputs, registers +the DoFn for execution on remote runners, and optimizes the runtime execution of +the DoFns via reflection. + +Combiner1 should be used when your accumulator, input, and output are all of the +same type. It can be called with register.Combiner1[T](&CustomCombiner{}) where T +is the type of the input/accumulator/output. + +Combiner2 should be used when your accumulator, input, and output are 2 distinct +types. It can be called with register.Combiner2[T1, T2](&CustomCombiner{}) where +T1 is the type of the accumulator and T2 is the other type. + +Combiner3 should be used when your accumulator, input, and output are 3 distinct +types. It can be called with register.Combiner3[T1, T2, T3](&CustomCombiner{}) +where T1 is the type of the accumulator, T2 is the type of the input, and T3 is the type of the output. +{{< /paragraph >}} + {{< highlight go >}} {{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}} {{< /highlight >}} @@ -1710,9 +1748,10 @@ a remote worker in your processing cluster. as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`; however, your subclass must not add any non-serializable members. Funcs are serializable as long as -they are registered with `beam.RegisterFunction`, and are not -closures. Structural `DoFn`s will have all exported fields serialized. -Unexported fields are unable to be serialized, and will be silently ignored. +they are registered with `beam.RegisterFunction` (for simple functions) or +`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural +`DoFn`s will have all exported fields serialized. Unexported fields are unable to +be serialized, and will be silently ignored. Some other serializability factors you should keep in mind are: @@ -1773,6 +1812,10 @@ processing each element in the input `PCollection`, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. +{{< paragraph class="language-go" >}} +All side input iters should be registered using the generic `register.IterX[...]` +This optimizes runtime execution of the iter. +{{< /paragraph >}} #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} @@ -1964,6 +2007,9 @@ multiple output PCollections. Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. + +All emitters should be registered using the generic `register.DoFnX[...]` +This optimizes runtime execution of the emitter. {{< /paragraph >}} {{< paragraph class="language-go" >}} From fa874fd0e1cbfe31cf9fa226a7c740da36b00e9d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 12 May 2022 10:15:50 -0400 Subject: [PATCH 2/6] CHANGES.md changes --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index ae674ead6e53..10b891d12592 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Go SDK users can now use generic registration functions to optimize their DoFn execution. ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)) * Go SDK users may now write self-checkpointing Splittable DoFns to read from streaming sources. ([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104)) ## Breaking Changes From edc8a3168838208fa701246c0f5001b652581f2e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 12 May 2022 10:18:54 -0400 Subject: [PATCH 3/6] Syntax fixes --- sdks/go/examples/snippets/04transforms.go | 2 +- website/www/site/content/en/documentation/programming-guide.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index aedc8a1f9af7..cf47d23463ea 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -42,7 +42,7 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { // DoFns must be registered with beam. func init() { - register.DoFn[string, func(int)](&ComputeWordLengthFn{})) + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) register.Emitter1[int]() } diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 855e8077390d..2965e80793b6 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -2008,7 +2008,7 @@ Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. -All emitters should be registered using the generic `register.DoFnX[...]` +All emitters should be registered using the generic `register.DoFnXxY[...]` This optimizes runtime execution of the emitter. {{< /paragraph >}} From 190d5c2398c4d04f1078b39091ee6901825e9e33 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 12 May 2022 10:44:21 -0400 Subject: [PATCH 4/6] Formatting --- .../content/en/documentation/programming-guide.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 2965e80793b6..93e9512720f9 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -817,7 +817,7 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { func init() { register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) - register.Emitter1[int]() + register.Emitter1[int]() } {{< /highlight >}} @@ -881,7 +881,7 @@ func ComputeWordLengthFn(word string, emit func(int)) { ... } func init() { register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) - register.Emitter1[int]() + register.Emitter1[int]() } {{< /highlight >}} @@ -1355,16 +1355,17 @@ the DoFn for execution on remote runners, and optimizes the runtime execution of the DoFns via reflection. Combiner1 should be used when your accumulator, input, and output are all of the -same type. It can be called with register.Combiner1[T](&CustomCombiner{}) where T +same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T` is the type of the input/accumulator/output. Combiner2 should be used when your accumulator, input, and output are 2 distinct -types. It can be called with register.Combiner2[T1, T2](&CustomCombiner{}) where -T1 is the type of the accumulator and T2 is the other type. +types. It can be called with `register.Combiner2[T1, T2]`(&CustomCombiner{}) where +`T1` is the type of the accumulator and `T2` is the other type. Combiner3 should be used when your accumulator, input, and output are 3 distinct -types. It can be called with register.Combiner3[T1, T2, T3](&CustomCombiner{}) -where T1 is the type of the accumulator, T2 is the type of the input, and T3 is the type of the output. +types. It can be called with `register.Combiner3[T1, T2, T3](&CustomCombiner{})` +where `T1` is the type of the accumulator, `T2` is the type of the input, and `T3` is +the type of the output. {{< /paragraph >}} {{< highlight go >}} From 8aef916cabad720064453856128b7315b3b77c24 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 12 May 2022 11:42:42 -0400 Subject: [PATCH 5/6] Wording --- .../en/documentation/programming-guide.md | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 93e9512720f9..4154ce79598d 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -699,10 +699,10 @@ processing function. {{< paragraph class="language-go" >}} -All DoFns should be registered using the generic `register.DoFnXxY[...]` -This allows the Go SDK to infer an encoding from any inputs/outputs, registers -the DoFn for execution on remote runners, and optimizes the runtime execution of -the DoFns via reflection. +All DoFns should be registered using a generic `register.DoFnXxY[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the DoFn for execution on remote runners, and optimizes the runtime +execution of the DoFns via reflection. {{< /paragraph >}} {{< highlight go >}} @@ -1349,10 +1349,10 @@ public static class SumInts implements SerializableFunction, I {{< /highlight >}} {{< paragraph class="language-go" >}} -All Combiners should be registered using the generic `register.CombinerX[...]` -This allows the Go SDK to infer an encoding from any inputs/outputs, registers -the DoFn for execution on remote runners, and optimizes the runtime execution of -the DoFns via reflection. +All Combiners should be registered using a generic `register.CombinerX[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the Combiner for execution on remote runners, and optimizes the runtime +execution of the Combiner via reflection. Combiner1 should be used when your accumulator, input, and output are all of the same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T` @@ -1814,8 +1814,8 @@ needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. {{< paragraph class="language-go" >}} -All side input iters should be registered using the generic `register.IterX[...]` -This optimizes runtime execution of the iter. +All side input iters should be registered using a generic `register.IterX[...]` +function. This optimizes runtime execution of the iter. {{< /paragraph >}} #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} @@ -2009,8 +2009,8 @@ Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. -All emitters should be registered using the generic `register.DoFnXxY[...]` -This optimizes runtime execution of the emitter. +All emitters should be registered using a generic `register.DoFnXxY[...]` +function. This optimizes runtime execution of the emitter. {{< /paragraph >}} {{< paragraph class="language-go" >}} From 3ffc3811813a05ad583c195f79e59aa7f85bd555 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 12 May 2022 12:06:30 -0400 Subject: [PATCH 6/6] Explanatory comments --- sdks/go/examples/snippets/04transforms.go | 8 ++++++++ .../site/content/en/documentation/programming-guide.md | 10 +++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index cf47d23463ea..f0dd1e780978 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -42,6 +42,9 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { // DoFns must be registered with beam. func init() { + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) register.Emitter1[int]() } @@ -187,6 +190,7 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str func init() { beam.RegisterFunction(formatCoGBKResults) + // 1 input of type string => Iter1[string] register.Iter1[string]() } @@ -404,7 +408,9 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st func init() { beam.RegisterFunction(filterWordsAbove) beam.RegisterFunction(filterWordsBelow) + // 1 input of type string => Emitter1[string] register.Emitter1[string]() + // 1 input of type float64 => Iter1[float64] register.Iter1[float64]() } @@ -459,6 +465,7 @@ func processWordsMixed(word string, emitMarked func(string)) int { func init() { beam.RegisterFunction(processWords) beam.RegisterFunction(processWordsMixed) + // 1 input of type string => Emitter1[string] register.Emitter1[string]() } @@ -504,6 +511,7 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { func init() { beam.RegisterFunction(extractWordsFn) + // 1 input of type string => Emitter1[string] register.Emitter1[string]() } diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 4154ce79598d..64dcdc3d7b5e 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -717,7 +717,9 @@ func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) } func init() { - register.DoFn2x1[string, context.Context, int](&ComputeWordLengthFn{}) + // 2 inputs and 1 output => DoFn2x1 + // Input/output types are included in order in the brackets + register.DoFn2x1[context.Context, string, int](&ComputeWordLengthFn{}) } {{< /highlight >}} @@ -816,6 +818,9 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { } func init() { + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) register.Emitter1[int]() } @@ -880,6 +885,9 @@ Simple DoFns can also be written as functions. func ComputeWordLengthFn(word string, emit func(int)) { ... } func init() { + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) register.Emitter1[int]() }