diff --git a/sdks/go/examples/cookbook/combine/combine.go b/sdks/go/examples/cookbook/combine/combine.go index 205f5156e322..33023b28f710 100644 --- a/sdks/go/examples/cookbook/combine/combine.go +++ b/sdks/go/examples/cookbook/combine/combine.go @@ -34,7 +34,8 @@ var ( input = flag.String("input", "publicdata:samples.shakespeare", "Shakespeare plays BQ table.") output = flag.String("output", "", "Output BQ table.") - minLength = flag.Int("min_length", 9, "Minimum word length") + minLength = flag.Int("min_length", 9, "Minimum word length") + small_words = beam.NewCounter("extract", "small_words") ) func init() { @@ -67,11 +68,12 @@ type extractFn struct { MinLength int `json:"min_length"` } -func (f *extractFn) ProcessElement(row WordRow, emit func(string, string)) { +func (f *extractFn) ProcessElement(ctx context.Context, row WordRow, emit func(string, string)) { if len(row.Word) >= f.MinLength { emit(row.Word, row.Corpus) + } else { + small_words.Inc(ctx, 1) } - // TODO(herohde) 7/14/2017: increment counter for "small words" } // TODO(herohde) 7/14/2017: the choice of a string (instead of []string) for the diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go index b2eb67d3b1bc..08d9cb57f4e4 100644 --- a/sdks/go/examples/wordcount/wordcount.go +++ b/sdks/go/examples/wordcount/wordcount.go @@ -106,23 +106,33 @@ var ( // done automatically by the starcgen code generator, or it can be done manually // by calling beam.RegisterFunction in an init() call. func init() { - beam.RegisterFunction(extractFn) beam.RegisterFunction(formatFn) } var ( - wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) - empty = beam.NewCounter("extract", "emptyLines") - lineLen = beam.NewDistribution("extract", "lineLenDistro") + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + empty = beam.NewCounter("extract", "emptyLines") + small_word_length = flag.Int("small_word_length", 9, "small_word_length") + small_words = beam.NewCounter("extract", "small_words") + lineLen = beam.NewDistribution("extract", "lineLenDistro") ) -// extractFn is a DoFn that emits the words in a given line. -func extractFn(ctx context.Context, line string, emit func(string)) { +// extractFn is a DoFn that emits the words in a given line and keeps a count for small words. +type extractFn struct { + SmallWordLength int `json:"min_length"` +} + +func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) { lineLen.Update(ctx, int64(len(line))) if len(strings.TrimSpace(line)) == 0 { empty.Inc(ctx, 1) } for _, word := range wordRE.FindAllString(line, -1) { + // increment the counter for small words if length of words is + // less than small_word_length + if len(word) < f.SmallWordLength { + small_words.Inc(ctx, 1) + } emit(word) } } @@ -150,7 +160,7 @@ func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. - col := beam.ParDo(s, extractFn, lines) + col := beam.ParDo(s, &extractFn{SmallWordLength: *small_word_length}, lines) // Count the number of times each word occurs. return stats.Count(s, col)