diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py index d8c60dd67662..f9ccdc065e88 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py @@ -163,43 +163,6 @@ def process(self, team_score, window=beam.DoFn.WindowParam): } -class WriteToBigQuery(beam.PTransform): - """Generate, format, and write BigQuery table row information.""" - def __init__(self, table_name, dataset, schema): - """Initializes the transform. - Args: - table_name: Name of the BigQuery table to use. - dataset: Name of the dataset to use. - schema: Dictionary in the format {'column_name': 'bigquery_type'} - """ - super(WriteToBigQuery, self).__init__() - self.table_name = table_name - self.dataset = dataset - self.schema = schema - - def get_schema(self): - """Build the output table schema.""" - return ', '.join( - '%s:%s' % (col, self.schema[col]) for col in self.schema) - - def get_table(self, pipeline): - """Utility to construct an output table reference.""" - project = pipeline.options.view_as(GoogleCloudOptions).project - return '%s:%s.%s' % (project, self.dataset, self.table_name) - - def expand(self, pcoll): - table = self.get_table(pcoll.pipeline) - return ( - pcoll - | 'ConvertToRow' >> beam.Map( - lambda elem: {col: elem[col] for col in self.schema}) - | beam.io.Write(beam.io.BigQuerySink( - table, - schema=self.get_schema(), - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) - - # [START abuse_detect] class CalculateSpammyUsers(beam.PTransform): """Filter out all but those users with a high clickrate, which we will @@ -280,7 +243,8 @@ def run(argv=None): options = PipelineOptions(pipeline_args) # We also require the --project option to access --dataset - if options.view_as(GoogleCloudOptions).project is None: + project = options.view_as(GoogleCloudOptions).project + if project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) @@ -296,6 +260,8 @@ def run(argv=None): # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True + table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name) + with beam.Pipeline(options=options) as p: # Read events from Pub/Sub using custom timestamps raw_events = ( @@ -332,6 +298,13 @@ def run(argv=None): # updates for late data. Uses the side input derived above --the set of # suspected robots-- to filter out scores from those users from the sum. # Write the results to BigQuery. + team_table_spec = table_spec_prefix + '_teams' + team_table_schema = ( + 'team:STRING, ' + 'total_score:INTEGER, ' + 'window_start:STRING, ' + 'processing_time: STRING') + (raw_events # pylint: disable=expression-not-assigned | 'WindowIntoFixedWindows' >> beam.WindowInto( beam.window.FixedWindows(fixed_window_duration)) @@ -344,19 +317,20 @@ def run(argv=None): | 'ExtractAndSumScore' >> ExtractAndSumScore('team') # [END filter_and_calc] | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) - | 'WriteTeamScoreSums' >> WriteToBigQuery( - args.table_name + '_teams', args.dataset, { - 'team': 'STRING', - 'total_score': 'INTEGER', - 'window_start': 'STRING', - 'processing_time': 'STRING', - })) + | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery( + team_table_spec, + schema=team_table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) # [START session_calc] # Detect user sessions-- that is, a burst of activity separated by a gap # from further activity. Find and record the mean session lengths. # This information could help the game designers track the changing user # engagement as their set of game changes. + session_table_spec = table_spec_prefix + '_sessions' + session_table_schema = 'mean_duration:FLOAT' + (user_events # pylint: disable=expression-not-assigned | 'WindowIntoSessions' >> beam.WindowInto( beam.window.Sessions(session_gap), @@ -381,10 +355,11 @@ def run(argv=None): | beam.CombineGlobally(beam.combiners.MeanCombineFn()).without_defaults() | 'FormatAvgSessionLength' >> beam.Map( lambda elem: {'mean_duration': float(elem)}) - | 'WriteAvgSessionLength' >> WriteToBigQuery( - args.table_name + '_sessions', args.dataset, { - 'mean_duration': 'FLOAT', - })) + | 'WriteAvgSessionLength' >> beam.io.WriteToBigQuery( + session_table_spec, + schema=session_table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) # [END rewindow] diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py index b286a6a5ddf4..6e826d45feed 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py @@ -156,43 +156,6 @@ def process(self, team_score, window=beam.DoFn.WindowParam): } -class WriteToBigQuery(beam.PTransform): - """Generate, format, and write BigQuery table row information.""" - def __init__(self, table_name, dataset, schema): - """Initializes the transform. - Args: - table_name: Name of the BigQuery table to use. - dataset: Name of the dataset to use. - schema: Dictionary in the format {'column_name': 'bigquery_type'} - """ - super(WriteToBigQuery, self).__init__() - self.table_name = table_name - self.dataset = dataset - self.schema = schema - - def get_schema(self): - """Build the output table schema.""" - return ', '.join( - '%s:%s' % (col, self.schema[col]) for col in self.schema) - - def get_table(self, pipeline): - """Utility to construct an output table reference.""" - project = pipeline.options.view_as(GoogleCloudOptions).project - return '%s:%s.%s' % (project, self.dataset, self.table_name) - - def expand(self, pcoll): - table = self.get_table(pcoll.pipeline) - return ( - pcoll - | 'ConvertToRow' >> beam.Map( - lambda elem: {col: elem[col] for col in self.schema}) - | beam.io.Write(beam.io.BigQuerySink( - table, - schema=self.get_schema(), - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) - - # [START main] class HourlyTeamScore(beam.PTransform): def __init__(self, start_min, stop_min, window_duration): @@ -278,7 +241,8 @@ def run(argv=None): options = PipelineOptions(pipeline_args) # We also require the --project option to access --dataset - if options.view_as(GoogleCloudOptions).project is None: + project = options.view_as(GoogleCloudOptions).project + if project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) @@ -287,18 +251,23 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). options.view_as(SetupOptions).save_main_session = True + table_spec = '{}:{}.{}'.format(project, args.dataset, args.table_name) + table_schema = ( + 'team:STRING, ' + 'total_score:INTEGER, ' + 'window_start:STRING') + with beam.Pipeline(options=options) as p: (p # pylint: disable=expression-not-assigned | 'ReadInputText' >> beam.io.ReadFromText(args.input) | 'HourlyTeamScore' >> HourlyTeamScore( args.start_min, args.stop_min, args.window_duration) | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) - | 'WriteTeamScoreSums' >> WriteToBigQuery( - args.table_name, args.dataset, { - 'team': 'STRING', - 'total_score': 'INTEGER', - 'window_start': 'STRING', - })) + | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) # [END main] diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py index e207f26712e3..0d1fce47663d 100644 --- a/sdks/python/apache_beam/examples/complete/game/leader_board.py +++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py @@ -171,43 +171,6 @@ def process(self, team_score, window=beam.DoFn.WindowParam): } -class WriteToBigQuery(beam.PTransform): - """Generate, format, and write BigQuery table row information.""" - def __init__(self, table_name, dataset, schema): - """Initializes the transform. - Args: - table_name: Name of the BigQuery table to use. - dataset: Name of the dataset to use. - schema: Dictionary in the format {'column_name': 'bigquery_type'} - """ - super(WriteToBigQuery, self).__init__() - self.table_name = table_name - self.dataset = dataset - self.schema = schema - - def get_schema(self): - """Build the output table schema.""" - return ', '.join( - '%s:%s' % (col, self.schema[col]) for col in self.schema) - - def get_table(self, pipeline): - """Utility to construct an output table reference.""" - project = pipeline.options.view_as(GoogleCloudOptions).project - return '%s:%s.%s' % (project, self.dataset, self.table_name) - - def expand(self, pcoll): - table = self.get_table(pcoll.pipeline) - return ( - pcoll - | 'ConvertToRow' >> beam.Map( - lambda elem: {col: elem[col] for col in self.schema}) - | beam.io.Write(beam.io.BigQuerySink( - table, - schema=self.get_schema(), - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) - - # [START window_and_trigger] class CalculateTeamScores(beam.PTransform): """Calculates scores for each team within the configured window duration. @@ -294,7 +257,8 @@ def run(argv=None): options = PipelineOptions(pipeline_args) # We also require the --project option to access --dataset - if options.view_as(GoogleCloudOptions).project is None: + project = options.view_as(GoogleCloudOptions).project + if project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) @@ -306,6 +270,8 @@ def run(argv=None): # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True + table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name) + with beam.Pipeline(options=options) as p: # Read game events from Pub/Sub using custom timestamps, which are extracted # from the pubsub data elements, and parse the data. @@ -316,32 +282,37 @@ def run(argv=None): | 'AddEventTimestamps' >> beam.Map( lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))) + team_table_spec = table_spec_prefix + '_teams' + team_table_schema = ( + 'team:STRING, ' + 'total_score:INTEGER, ' + 'window_start:STRING, ' + 'processing_time: STRING') + # Get team scores and write the results to BigQuery (events # pylint: disable=expression-not-assigned | 'CalculateTeamScores' >> CalculateTeamScores( args.team_window_duration, args.allowed_lateness) | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) - | 'WriteTeamScoreSums' >> WriteToBigQuery( - args.table_name + '_teams', args.dataset, { - 'team': 'STRING', - 'total_score': 'INTEGER', - 'window_start': 'STRING', - 'processing_time': 'STRING', - })) - - def format_user_score_sums(user_score): - (user, score) = user_score - return {'user': user, 'total_score': score} + | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery( + team_table_spec, + schema=team_table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + + user_table_spec = table_spec_prefix + '_users' + user_table_schema = 'user:STRING, total_score:INTEGER' # Get user scores and write the results to BigQuery (events # pylint: disable=expression-not-assigned | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness) - | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums) - | 'WriteUserScoreSums' >> WriteToBigQuery( - args.table_name + '_users', args.dataset, { - 'user': 'STRING', - 'total_score': 'INTEGER', - })) + | 'FormatUserScoreSums' >> beam.Map( + lambda elem: {'user': elem[0], 'total_score': elem[1]}) + | 'WriteUserScoreSums' >> beam.io.WriteToBigQuery( + user_table_spec, + schema=user_table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) if __name__ == '__main__':