From 75e286ad66f94fa8eb81c643744adabe5690cb80 Mon Sep 17 00:00:00 2001 From: Charles Chen Date: Fri, 9 Mar 2018 16:19:55 -0800 Subject: [PATCH] Revert #4666 "Use beam.io.WriteToBigQuery()" That change broke execution of hourly_team_score. --- .../examples/complete/game/game_stats.py | 73 +++++++++++------ .../complete/game/hourly_team_score.py | 57 ++++++++++--- .../examples/complete/game/leader_board.py | 81 +++++++++++++------ 3 files changed, 148 insertions(+), 63 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py index 32f6f15e3a83..1f13ed180f6a 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py @@ -163,6 +163,43 @@ def process(self, team_score, window=beam.DoFn.WindowParam): } +class WriteToBigQuery(beam.PTransform): + """Generate, format, and write BigQuery table row information.""" + def __init__(self, table_name, dataset, schema): + """Initializes the transform. + Args: + table_name: Name of the BigQuery table to use. + dataset: Name of the dataset to use. + schema: Dictionary in the format {'column_name': 'bigquery_type'} + """ + super(WriteToBigQuery, self).__init__() + self.table_name = table_name + self.dataset = dataset + self.schema = schema + + def get_schema(self): + """Build the output table schema.""" + return ', '.join( + '%s:%s' % (col, self.schema[col]) for col in self.schema) + + def get_table(self, pipeline): + """Utility to construct an output table reference.""" + project = pipeline.options.view_as(GoogleCloudOptions).project + return '%s:%s.%s' % (project, self.dataset, self.table_name) + + def expand(self, pcoll): + table = self.get_table(pcoll.pipeline) + return ( + pcoll + | 'ConvertToRow' >> beam.Map( + lambda elem: {col: elem[col] for col in self.schema}) + | beam.io.Write(beam.io.BigQuerySink( + table, + schema=self.get_schema(), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) + + # [START abuse_detect] class CalculateSpammyUsers(beam.PTransform): """Filter out all but those users with a high clickrate, which we will @@ -243,8 +280,7 @@ def run(argv=None): options = PipelineOptions(pipeline_args) # We also require the --project option to access --dataset - project = options.view_as(GoogleCloudOptions).project - if project is None: + if options.view_as(GoogleCloudOptions).project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) @@ -260,8 +296,6 @@ def run(argv=None): # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True - table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name) - with beam.Pipeline(options=options) as p: # Read events from Pub/Sub using custom timestamps raw_events = ( @@ -298,13 +332,6 @@ def run(argv=None): # updates for late data. Uses the side input derived above --the set of # suspected robots-- to filter out scores from those users from the sum. # Write the results to BigQuery. - team_table_spec = table_spec_prefix + '_teams' - team_table_schema = ( - 'team:STRING, ' - 'total_score:INTEGER, ' - 'window_start:STRING, ' - 'processing_time: STRING') - (raw_events # pylint: disable=expression-not-assigned | 'WindowIntoFixedWindows' >> beam.WindowInto( beam.window.FixedWindows(fixed_window_duration)) @@ -317,20 +344,19 @@ def run(argv=None): | 'ExtractAndSumScore' >> ExtractAndSumScore('team') # [END filter_and_calc] | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) - | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery( - team_table_spec, - schema=team_table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + | 'WriteTeamScoreSums' >> WriteToBigQuery( + args.table_name + '_teams', args.dataset, { + 'team': 'STRING', + 'total_score': 'INTEGER', + 'window_start': 'STRING', + 'processing_time': 'STRING', + })) # [START session_calc] # Detect user sessions-- that is, a burst of activity separated by a gap # from further activity. Find and record the mean session lengths. # This information could help the game designers track the changing user # engagement as their set of game changes. - session_table_spec = table_spec_prefix + '_sessions' - session_table_schema = 'mean_duration:FLOAT' - (user_events # pylint: disable=expression-not-assigned | 'WindowIntoSessions' >> beam.WindowInto( beam.window.Sessions(session_gap), @@ -355,11 +381,10 @@ def run(argv=None): | beam.CombineGlobally(beam.combiners.MeanCombineFn()).without_defaults() | 'FormatAvgSessionLength' >> beam.Map( lambda elem: {'mean_duration': float(elem)}) - | 'WriteAvgSessionLength' >> beam.io.WriteToBigQuery( - session_table_spec, - schema=session_table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + | 'WriteAvgSessionLength' >> WriteToBigQuery( + args.table_name + '_sessions', args.dataset, { + 'mean_duration': 'FLOAT', + })) # [END rewindow] diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py index 6e826d45feed..b286a6a5ddf4 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py @@ -156,6 +156,43 @@ def process(self, team_score, window=beam.DoFn.WindowParam): } +class WriteToBigQuery(beam.PTransform): + """Generate, format, and write BigQuery table row information.""" + def __init__(self, table_name, dataset, schema): + """Initializes the transform. + Args: + table_name: Name of the BigQuery table to use. + dataset: Name of the dataset to use. + schema: Dictionary in the format {'column_name': 'bigquery_type'} + """ + super(WriteToBigQuery, self).__init__() + self.table_name = table_name + self.dataset = dataset + self.schema = schema + + def get_schema(self): + """Build the output table schema.""" + return ', '.join( + '%s:%s' % (col, self.schema[col]) for col in self.schema) + + def get_table(self, pipeline): + """Utility to construct an output table reference.""" + project = pipeline.options.view_as(GoogleCloudOptions).project + return '%s:%s.%s' % (project, self.dataset, self.table_name) + + def expand(self, pcoll): + table = self.get_table(pcoll.pipeline) + return ( + pcoll + | 'ConvertToRow' >> beam.Map( + lambda elem: {col: elem[col] for col in self.schema}) + | beam.io.Write(beam.io.BigQuerySink( + table, + schema=self.get_schema(), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) + + # [START main] class HourlyTeamScore(beam.PTransform): def __init__(self, start_min, stop_min, window_duration): @@ -241,8 +278,7 @@ def run(argv=None): options = PipelineOptions(pipeline_args) # We also require the --project option to access --dataset - project = options.view_as(GoogleCloudOptions).project - if project is None: + if options.view_as(GoogleCloudOptions).project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) @@ -251,23 +287,18 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). options.view_as(SetupOptions).save_main_session = True - table_spec = '{}:{}.{}'.format(project, args.dataset, args.table_name) - table_schema = ( - 'team:STRING, ' - 'total_score:INTEGER, ' - 'window_start:STRING') - with beam.Pipeline(options=options) as p: (p # pylint: disable=expression-not-assigned | 'ReadInputText' >> beam.io.ReadFromText(args.input) | 'HourlyTeamScore' >> HourlyTeamScore( args.start_min, args.stop_min, args.window_duration) | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) - | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery( - table_spec, - schema=table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + | 'WriteTeamScoreSums' >> WriteToBigQuery( + args.table_name, args.dataset, { + 'team': 'STRING', + 'total_score': 'INTEGER', + 'window_start': 'STRING', + })) # [END main] diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py index 0d1fce47663d..e207f26712e3 100644 --- a/sdks/python/apache_beam/examples/complete/game/leader_board.py +++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py @@ -171,6 +171,43 @@ def process(self, team_score, window=beam.DoFn.WindowParam): } +class WriteToBigQuery(beam.PTransform): + """Generate, format, and write BigQuery table row information.""" + def __init__(self, table_name, dataset, schema): + """Initializes the transform. + Args: + table_name: Name of the BigQuery table to use. + dataset: Name of the dataset to use. + schema: Dictionary in the format {'column_name': 'bigquery_type'} + """ + super(WriteToBigQuery, self).__init__() + self.table_name = table_name + self.dataset = dataset + self.schema = schema + + def get_schema(self): + """Build the output table schema.""" + return ', '.join( + '%s:%s' % (col, self.schema[col]) for col in self.schema) + + def get_table(self, pipeline): + """Utility to construct an output table reference.""" + project = pipeline.options.view_as(GoogleCloudOptions).project + return '%s:%s.%s' % (project, self.dataset, self.table_name) + + def expand(self, pcoll): + table = self.get_table(pcoll.pipeline) + return ( + pcoll + | 'ConvertToRow' >> beam.Map( + lambda elem: {col: elem[col] for col in self.schema}) + | beam.io.Write(beam.io.BigQuerySink( + table, + schema=self.get_schema(), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) + + # [START window_and_trigger] class CalculateTeamScores(beam.PTransform): """Calculates scores for each team within the configured window duration. @@ -257,8 +294,7 @@ def run(argv=None): options = PipelineOptions(pipeline_args) # We also require the --project option to access --dataset - project = options.view_as(GoogleCloudOptions).project - if project is None: + if options.view_as(GoogleCloudOptions).project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) @@ -270,8 +306,6 @@ def run(argv=None): # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True - table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name) - with beam.Pipeline(options=options) as p: # Read game events from Pub/Sub using custom timestamps, which are extracted # from the pubsub data elements, and parse the data. @@ -282,37 +316,32 @@ def run(argv=None): | 'AddEventTimestamps' >> beam.Map( lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))) - team_table_spec = table_spec_prefix + '_teams' - team_table_schema = ( - 'team:STRING, ' - 'total_score:INTEGER, ' - 'window_start:STRING, ' - 'processing_time: STRING') - # Get team scores and write the results to BigQuery (events # pylint: disable=expression-not-assigned | 'CalculateTeamScores' >> CalculateTeamScores( args.team_window_duration, args.allowed_lateness) | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) - | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery( - team_table_spec, - schema=team_table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) - - user_table_spec = table_spec_prefix + '_users' - user_table_schema = 'user:STRING, total_score:INTEGER' + | 'WriteTeamScoreSums' >> WriteToBigQuery( + args.table_name + '_teams', args.dataset, { + 'team': 'STRING', + 'total_score': 'INTEGER', + 'window_start': 'STRING', + 'processing_time': 'STRING', + })) + + def format_user_score_sums(user_score): + (user, score) = user_score + return {'user': user, 'total_score': score} # Get user scores and write the results to BigQuery (events # pylint: disable=expression-not-assigned | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness) - | 'FormatUserScoreSums' >> beam.Map( - lambda elem: {'user': elem[0], 'total_score': elem[1]}) - | 'WriteUserScoreSums' >> beam.io.WriteToBigQuery( - user_table_spec, - schema=user_table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums) + | 'WriteUserScoreSums' >> WriteToBigQuery( + args.table_name + '_users', args.dataset, { + 'user': 'STRING', + 'total_score': 'INTEGER', + })) if __name__ == '__main__':