From 68c1bcaea38bd4d98f3b15a846f38c0fc809043d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 1 Jun 2022 11:18:52 -0400 Subject: [PATCH 1/5] [BEAM-14347] Update docs to prefer generic registration functions --- sdks/go/examples/snippets/04transforms.go | 20 ++++- .../en/documentation/programming-guide.md | 77 ++++++++++++++++--- 2 files changed, 84 insertions(+), 13 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 80ff7ed66aae..829bbbd657c0 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" ) @@ -42,6 +43,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { // DoFns must be registered with beam. func init() { beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } // [END model_pardo_pardo] @@ -185,6 +191,8 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str func init() { beam.RegisterFunction(formatCoGBKResults) + // 1 input of type string => Iter1[string] + register.Iter1[string]() } // [END cogroupbykey_output_helpers] @@ -258,7 +266,7 @@ func (fn *boundedSum) MergeAccumulators(a, v int) int { } func init() { - beam.RegisterType(reflect.TypeOf((*boundedSum)(nil))) + register.Combiner1[int](&boundedSum{}) } func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection { @@ -295,7 +303,7 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 { } func init() { - beam.RegisterType(reflect.TypeOf((*averageFn)(nil))) + register.Combiner3[averageAccum, int, float64](&averageFn{}) } // [END combine_custom_average] @@ -401,6 +409,10 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st func init() { beam.RegisterFunction(filterWordsAbove) beam.RegisterFunction(filterWordsBelow) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() + // 1 input of type float64 => Iter1[float64] + register.Iter1[float64]() } // [END model_pardo_side_input_dofn] @@ -454,6 +466,8 @@ func processWordsMixed(word string, emitMarked func(string)) int { func init() { beam.RegisterFunction(processWords) beam.RegisterFunction(processWordsMixed) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() } // [END model_multiple_output_dofn] @@ -498,6 +512,8 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { func init() { beam.RegisterFunction(extractWordsFn) + // 1 input of type string => Emitter1[string] + register.Emitter1[string]() } // [START countwords_composite] diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 8bf690488a3a..5546dbbf5549 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -432,12 +432,6 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed. -{{< paragraph class="language-go" >}} -Custom struct types should be registered with beam using `beam.RegisterType`. -Among other things, this allows the Go SDK to infer an encoding from their -exported fields. Unexported fields in struct types are ignored. -{{< /paragraph >}} - #### 3.2.2. Element schema {#element-schema} In many cases, the element type in a `PCollection` has a structure that can introspected. @@ -704,6 +698,31 @@ processing function. +{{< paragraph class="language-go" >}} +All DoFns should be registered using a generic `register.DoFnXxY[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the DoFn for execution on remote runners, and optimizes the runtime +execution of the DoFns via reflection. +{{< /paragraph >}} + +{{< highlight go >}} +// ComputeWordLengthFn is a DoFn that computes the word length of string elements. +type ComputeWordLengthFn struct{} + +// ProcessElement computes the length of word and emits the result. +// When creating structs as a DoFn, the ProcessElement method performs the +// work of this step in the pipeline. +func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) int { + ... +} + +func init() { + // 2 inputs and 1 output => DoFn2x1 + // Input/output types are included in order in the brackets + register.DoFn2x1[context.Context, string, int](&ComputeWordLengthFn{}) +} +{{< /highlight >}} + ##### 4.2.1.1. Applying ParDo {#applying-pardo} {{< paragraph class="language-java language-py" >}} @@ -799,7 +818,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { } func init() { - beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -862,7 +885,11 @@ Simple DoFns can also be written as functions. func ComputeWordLengthFn(word string, emit func(int)) { ... } func init() { - beam.RegisterFunction(ComputeWordLengthFn) + // 2 inputs and 0 outputs => DoFn2x0 + // 1 input => Emitter1 + // Input/output types are included in order in the brackets + register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Emitter1[int]() } {{< /highlight >}} @@ -1329,6 +1356,26 @@ public static class SumInts implements SerializableFunction, I {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" combine_bounded_sum >}} {{< /highlight >}} +{{< paragraph class="language-go" >}} +All Combiners should be registered using a generic `register.CombinerX[...]` +function. This allows the Go SDK to infer an encoding from any inputs/outputs, +registers the Combiner for execution on remote runners, and optimizes the runtime +execution of the Combiner via reflection. + +Combiner1 should be used when your accumulator, input, and output are all of the +same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T` +is the type of the input/accumulator/output. + +Combiner2 should be used when your accumulator, input, and output are 2 distinct +types. It can be called with `register.Combiner2[T1, T2]`(&CustomCombiner{}) where +`T1` is the type of the accumulator and `T2` is the other type. + +Combiner3 should be used when your accumulator, input, and output are 3 distinct +types. It can be called with `register.Combiner3[T1, T2, T3](&CustomCombiner{})` +where `T1` is the type of the accumulator, `T2` is the type of the input, and `T3` is +the type of the output. +{{< /paragraph >}} + {{< highlight go >}} {{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}} {{< /highlight >}} @@ -1710,9 +1757,10 @@ a remote worker in your processing cluster. as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`; however, your subclass must not add any non-serializable members. Funcs are serializable as long as -they are registered with `beam.RegisterFunction`, and are not -closures. Structural `DoFn`s will have all exported fields serialized. -Unexported fields are unable to be serialized, and will be silently ignored. +they are registered with `beam.RegisterFunction` (for simple functions) or +`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural +`DoFn`s will have all exported fields serialized. Unexported fields are unable to +be serialized, and will be silently ignored. Some other serializability factors you should keep in mind are: @@ -1773,6 +1821,10 @@ processing each element in the input `PCollection`, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. +{{< paragraph class="language-go" >}} +All side input iters should be registered using a generic `register.IterX[...]` +function. This optimizes runtime execution of the iter. +{{< /paragraph >}} #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} @@ -1964,6 +2016,9 @@ multiple output PCollections. Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. + +All emitters should be registered using a generic `register.DoFnXxY[...]` +function. This optimizes runtime execution of the emitter. {{< /paragraph >}} {{< paragraph class="language-go" >}} From 70ee60169bc905ead4f7b6d0a97aa5ffba969850 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 1 Jun 2022 12:52:40 -0400 Subject: [PATCH 2/5] A little cleanup --- .../www/site/content/en/documentation/programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 5546dbbf5549..7f54acec127d 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -1367,7 +1367,7 @@ same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` wher is the type of the input/accumulator/output. Combiner2 should be used when your accumulator, input, and output are 2 distinct -types. It can be called with `register.Combiner2[T1, T2]`(&CustomCombiner{}) where +types. It can be called with `register.Combiner2[T1, T2](&CustomCombiner{})` where `T1` is the type of the accumulator and `T2` is the other type. Combiner3 should be used when your accumulator, input, and output are 3 distinct @@ -1822,8 +1822,8 @@ needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. {{< paragraph class="language-go" >}} -All side input iters should be registered using a generic `register.IterX[...]` -function. This optimizes runtime execution of the iter. +All side input iterables should be registered using a generic `register.IterX[...]` +function. This optimizes runtime execution of the iterable. {{< /paragraph >}} #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} From eecaa2f58b103d0c851ca3b3082eb2518f9f3cd1 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 17 Jun 2022 14:08:23 -0400 Subject: [PATCH 3/5] register.Function changes --- sdks/go/examples/snippets/04transforms.go | 18 +++++++++--------- .../en/documentation/programming-guide.md | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 0eac699ac1f0..57e58093661d 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -86,7 +86,7 @@ func splitStringPair(e stringPair) (string, string) { func init() { // Register element types and DoFns. beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem()) - beam.RegisterFunction(splitStringPair) + register.Function1x2[stringPair, string, string](splitStringPair) } // CreateAndSplit is a helper function that creates @@ -208,7 +208,7 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str } func init() { - beam.RegisterFunction(formatCoGBKResults) + register.Function3x1[string, func(*string) bool, func(*string) bool, string](formatCoGBKResults) // 1 input of type string => Iter1[string] register.Iter1[string]() } @@ -264,7 +264,7 @@ func sumInts(a, v int) int { } func init() { - beam.RegisterFunction(sumInts) + register.Function2x1[int, int, int](sumInts) } func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection { @@ -378,7 +378,7 @@ func decileFn(student Student) int { } func init() { - beam.RegisterFunction(decileFn) + register.Function1x1[Student, int](decileFn) } // [END model_multiple_pcollections_partition_fn] @@ -425,8 +425,8 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st } func init() { - beam.RegisterFunction(filterWordsAbove) - beam.RegisterFunction(filterWordsBelow) + register.Function3x1[string, func(*float64) bool, func(string), error](filterWordsAbove) + register.Function3x0[string, float64, func(string)](filterWordsBelow) // 1 input of type string => Emitter1[string] register.Emitter1[string]() // 1 input of type float64 => Iter1[float64] @@ -482,8 +482,8 @@ func processWordsMixed(word string, emitMarked func(string)) int { } func init() { - beam.RegisterFunction(processWords) - beam.RegisterFunction(processWordsMixed) + register.Function4x0[string, func(string), func(string), func(string)](processWords) + register.Function2x1[string, func(string), int](processWordsMixed) // 1 input of type string => Emitter1[string] register.Emitter1[string]() } @@ -529,7 +529,7 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { // [END model_paneinfo] func init() { - beam.RegisterFunction(extractWordsFn) + register.Function3x0[beam.PaneInfo, string, func(string)](extractWordsFn) // 1 input of type string => Emitter1[string] register.Emitter1[string]() } diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index e6557657e85a..7343f85b3488 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -1054,7 +1054,7 @@ var words beam.PCollection = ... > **Note:** Anonymous function DoFns may not work on distributed runners. -> It's recommended to use named functions and register them with `beam.RegisterFunction` in +> It's recommended to use named functions and register them with `register.FunctionXxY[In1, In2, ..., InX, Out1, Out2, ..., OutY](myFunc)` in > an `init()` block. @@ -1757,7 +1757,7 @@ a remote worker in your processing cluster. as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`; however, your subclass must not add any non-serializable members. Funcs are serializable as long as -they are registered with `beam.RegisterFunction` (for simple functions) or +they are registered with `register.FunctionXxY` (for simple functions) or `register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural `DoFn`s will have all exported fields serialized. Unexported fields are unable to be serialized, and will be silently ignored. From bf67dcf9245937a1f92a658e2e9f657b5c5dbafb Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 17 Jun 2022 14:12:18 -0400 Subject: [PATCH 4/5] Type infference is nice --- sdks/go/examples/snippets/04transforms.go | 18 +++++++++--------- .../en/documentation/programming-guide.md | 6 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 57e58093661d..2031bf1eac49 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -86,7 +86,7 @@ func splitStringPair(e stringPair) (string, string) { func init() { // Register element types and DoFns. beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem()) - register.Function1x2[stringPair, string, string](splitStringPair) + register.Function1x2(splitStringPair) } // CreateAndSplit is a helper function that creates @@ -208,7 +208,7 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str } func init() { - register.Function3x1[string, func(*string) bool, func(*string) bool, string](formatCoGBKResults) + register.Function3x1(formatCoGBKResults) // 1 input of type string => Iter1[string] register.Iter1[string]() } @@ -264,7 +264,7 @@ func sumInts(a, v int) int { } func init() { - register.Function2x1[int, int, int](sumInts) + register.Function2x1(sumInts) } func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection { @@ -378,7 +378,7 @@ func decileFn(student Student) int { } func init() { - register.Function1x1[Student, int](decileFn) + register.Function1x1(decileFn) } // [END model_multiple_pcollections_partition_fn] @@ -425,8 +425,8 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st } func init() { - register.Function3x1[string, func(*float64) bool, func(string), error](filterWordsAbove) - register.Function3x0[string, float64, func(string)](filterWordsBelow) + register.Function3x1(filterWordsAbove) + register.Function3x0(filterWordsBelow) // 1 input of type string => Emitter1[string] register.Emitter1[string]() // 1 input of type float64 => Iter1[float64] @@ -482,8 +482,8 @@ func processWordsMixed(word string, emitMarked func(string)) int { } func init() { - register.Function4x0[string, func(string), func(string), func(string)](processWords) - register.Function2x1[string, func(string), int](processWordsMixed) + register.Function4x0(processWords) + register.Function2x1(processWordsMixed) // 1 input of type string => Emitter1[string] register.Emitter1[string]() } @@ -529,7 +529,7 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { // [END model_paneinfo] func init() { - register.Function3x0[beam.PaneInfo, string, func(string)](extractWordsFn) + register.Function3x0(extractWordsFn) // 1 input of type string => Emitter1[string] register.Emitter1[string]() } diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 7343f85b3488..5653e16dbf15 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -821,7 +821,7 @@ func init() { // 2 inputs and 0 outputs => DoFn2x0 // 1 input => Emitter1 // Input/output types are included in order in the brackets - register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) + register.Function2x0(&ComputeWordLengthFn{}) register.Emitter1[int]() } {{< /highlight >}} @@ -1054,7 +1054,7 @@ var words beam.PCollection = ... > **Note:** Anonymous function DoFns may not work on distributed runners. -> It's recommended to use named functions and register them with `register.FunctionXxY[In1, In2, ..., InX, Out1, Out2, ..., OutY](myFunc)` in +> It's recommended to use named functions and register them with `register.FunctionXxY` in > an `init()` block. @@ -2017,7 +2017,7 @@ Call emitter functions as needed to produce 0 or more elements for its matching `PCollection`. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitter. -All emitters should be registered using a generic `register.DoFnXxY[...]` +All emitters should be registered using a generic `register.EmitterX[...]` function. This optimizes runtime execution of the emitter. {{< /paragraph >}} From a547214db24c32c809d2c8d80fdfc9c892a1c586 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 29 Jun 2022 12:49:21 -0400 Subject: [PATCH 5/5] Extraneous registerType --- sdks/go/examples/snippets/04transforms.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 2031bf1eac49..0d0d18c78b32 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -84,8 +84,7 @@ func splitStringPair(e stringPair) (string, string) { } func init() { - // Register element types and DoFns. - beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem()) + // Register DoFn. register.Function1x2(splitStringPair) }