From a58862f649a42c34dcc8c6600b1bbd7070dcc0c6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 6 Aug 2018 15:12:07 -0700 Subject: [PATCH 1/4] Add counter to combine example in go sdk --- sdks/go/examples/cookbook/combine/combine.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/go/examples/cookbook/combine/combine.go b/sdks/go/examples/cookbook/combine/combine.go index 7e24aa1fb307..1950a687d242 100644 --- a/sdks/go/examples/cookbook/combine/combine.go +++ b/sdks/go/examples/cookbook/combine/combine.go @@ -63,11 +63,16 @@ type extractFn struct { MinLength int `json:"min_length"` } +// A global context for simplicity. +var ctx = context.Background() + func (f *extractFn) ProcessElement(row WordRow, emit func(string, string)) { + small_words := beam.NewCounter("example.namespace", "small_words") if len(row.Word) >= f.MinLength { emit(row.Word, row.Corpus) + } else { + small_words.Inc(ctx, 1) } - // TODO(herohde) 7/14/2017: increment counter for "small words" } // TODO(herohde) 7/14/2017: the choice of a string (instead of []string) for the From 9dca44586c0d0b8c05dda4fccaf9597a22c728cf Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Thu, 26 Aug 2021 12:40:07 -0400 Subject: [PATCH 2/4] [BEAM-5097] Increment counter for small words in go SDK example --- sdks/go/examples/cookbook/combine/combine.go | 8 +++----- sdks/go/examples/wordcount/wordcount.go | 11 ++++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sdks/go/examples/cookbook/combine/combine.go b/sdks/go/examples/cookbook/combine/combine.go index 1950a687d242..cc9e468afa3b 100644 --- a/sdks/go/examples/cookbook/combine/combine.go +++ b/sdks/go/examples/cookbook/combine/combine.go @@ -34,7 +34,8 @@ var ( input = flag.String("input", "publicdata:samples.shakespeare", "Shakespeare plays BQ table.") output = flag.String("output", "", "Output BQ table.") - minLength = flag.Int("min_length", 9, "Minimum word length") + minLength = flag.Int("min_length", 9, "Minimum word length") + small_words = beam.NewCounter("extract", "small_words") ) type WordRow struct { @@ -63,11 +64,8 @@ type extractFn struct { MinLength int `json:"min_length"` } -// A global context for simplicity. -var ctx = context.Background() +func (f *extractFn) ProcessElement(ctx context.Context, row WordRow, emit func(string, string)) { -func (f *extractFn) ProcessElement(row WordRow, emit func(string, string)) { - small_words := beam.NewCounter("example.namespace", "small_words") if len(row.Word) >= f.MinLength { emit(row.Word, row.Corpus) } else { diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go index f28d1254d427..515843d8e5ae 100644 --- a/sdks/go/examples/wordcount/wordcount.go +++ b/sdks/go/examples/wordcount/wordcount.go @@ -103,9 +103,11 @@ var ( // for easy reuse, modular testing, and an improved monitoring experience. var ( - wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) - empty = beam.NewCounter("extract", "emptyLines") - lineLen = beam.NewDistribution("extract", "lineLenDistro") + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + empty = beam.NewCounter("extract", "emptyLines") + minLength = flag.Int("min_length", 9, "Minimum word length") + small_words = beam.NewCounter("extract", "small_words") + lineLen = beam.NewDistribution("extract", "lineLenDistro") ) // extractFn is a DoFn that emits the words in a given line. @@ -115,6 +117,9 @@ func extractFn(ctx context.Context, line string, emit func(string)) { empty.Inc(ctx, 1) } for _, word := range wordRE.FindAllString(line, -1) { + if len(word) < *minLength { + small_words.Inc(ctx, 1) + } emit(word) } } From c8862c469b8e5f3b3dfc08407b4cef882d6b5ea2 Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Tue, 31 Aug 2021 10:06:28 -0400 Subject: [PATCH 3/4] [BEAM-5097] comments and formatting for small words counter --- sdks/go/examples/cookbook/combine/combine.go | 1 - sdks/go/examples/wordcount/wordcount.go | 14 ++++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/go/examples/cookbook/combine/combine.go b/sdks/go/examples/cookbook/combine/combine.go index a9340f41ed47..33023b28f710 100644 --- a/sdks/go/examples/cookbook/combine/combine.go +++ b/sdks/go/examples/cookbook/combine/combine.go @@ -69,7 +69,6 @@ type extractFn struct { } func (f *extractFn) ProcessElement(ctx context.Context, row WordRow, emit func(string, string)) { - if len(row.Word) >= f.MinLength { emit(row.Word, row.Corpus) } else { diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go index 24948664dac1..8aa815854e22 100644 --- a/sdks/go/examples/wordcount/wordcount.go +++ b/sdks/go/examples/wordcount/wordcount.go @@ -111,11 +111,11 @@ func init() { } var ( - wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) - empty = beam.NewCounter("extract", "emptyLines") - minLength = flag.Int("min_length", 9, "Minimum word length") - small_words = beam.NewCounter("extract", "small_words") - lineLen = beam.NewDistribution("extract", "lineLenDistro") + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + empty = beam.NewCounter("extract", "emptyLines") + small_word_length = flag.Int("small_word_length", 9, "small_word_length") + small_words = beam.NewCounter("extract", "small_words") + lineLen = beam.NewDistribution("extract", "lineLenDistro") ) // extractFn is a DoFn that emits the words in a given line. @@ -125,7 +125,9 @@ func extractFn(ctx context.Context, line string, emit func(string)) { empty.Inc(ctx, 1) } for _, word := range wordRE.FindAllString(line, -1) { - if len(word) < *minLength { + // increment the counter for small words if length of words is + // less than small_word_length + if len(word) < *small_word_length { small_words.Inc(ctx, 1) } emit(word) From bdb69913988c4131050216c7ec5b2c74d587a585 Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Wed, 8 Sep 2021 11:20:27 -0400 Subject: [PATCH 4/4] [BEAM-5097] made extractFn structural DoFn in wordcount example --- sdks/go/examples/wordcount/wordcount.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go index 8aa815854e22..08d9cb57f4e4 100644 --- a/sdks/go/examples/wordcount/wordcount.go +++ b/sdks/go/examples/wordcount/wordcount.go @@ -106,7 +106,6 @@ var ( // done automatically by the starcgen code generator, or it can be done manually // by calling beam.RegisterFunction in an init() call. func init() { - beam.RegisterFunction(extractFn) beam.RegisterFunction(formatFn) } @@ -118,8 +117,12 @@ var ( lineLen = beam.NewDistribution("extract", "lineLenDistro") ) -// extractFn is a DoFn that emits the words in a given line. -func extractFn(ctx context.Context, line string, emit func(string)) { +// extractFn is a DoFn that emits the words in a given line and keeps a count for small words. +type extractFn struct { + SmallWordLength int `json:"min_length"` +} + +func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) { lineLen.Update(ctx, int64(len(line))) if len(strings.TrimSpace(line)) == 0 { empty.Inc(ctx, 1) @@ -127,7 +130,7 @@ func extractFn(ctx context.Context, line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { // increment the counter for small words if length of words is // less than small_word_length - if len(word) < *small_word_length { + if len(word) < f.SmallWordLength { small_words.Inc(ctx, 1) } emit(word) @@ -157,7 +160,7 @@ func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. - col := beam.ParDo(s, extractFn, lines) + col := beam.ParDo(s, &extractFn{SmallWordLength: *small_word_length}, lines) // Count the number of times each word occurs. return stats.Count(s, col)