Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 24 additions & 49 deletions sdks/python/apache_beam/examples/complete/game/game_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,43 +163,6 @@ def process(self, team_score, window=beam.DoFn.WindowParam):
}


class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema

def get_schema(self):
"""Build the output table schema."""
return ', '.join(
'%s:%s' % (col, self.schema[col]) for col in self.schema)

def get_table(self, pipeline):
"""Utility to construct an output table reference."""
project = pipeline.options.view_as(GoogleCloudOptions).project
return '%s:%s.%s' % (project, self.dataset, self.table_name)

def expand(self, pcoll):
table = self.get_table(pcoll.pipeline)
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.Write(beam.io.BigQuerySink(
table,
schema=self.get_schema(),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))


# [START abuse_detect]
class CalculateSpammyUsers(beam.PTransform):
"""Filter out all but those users with a high clickrate, which we will
Expand Down Expand Up @@ -280,7 +243,8 @@ def run(argv=None):
options = PipelineOptions(pipeline_args)

# We also require the --project option to access --dataset
if options.view_as(GoogleCloudOptions).project is None:
project = options.view_as(GoogleCloudOptions).project
if project is None:
parser.print_usage()
print(sys.argv[0] + ': error: argument --project is required')
sys.exit(1)
Expand All @@ -296,6 +260,8 @@ def run(argv=None):
# Enforce that this pipeline is always run in streaming mode
options.view_as(StandardOptions).streaming = True

table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name)

with beam.Pipeline(options=options) as p:
# Read events from Pub/Sub using custom timestamps
raw_events = (
Expand Down Expand Up @@ -332,6 +298,13 @@ def run(argv=None):
# updates for late data. Uses the side input derived above --the set of
# suspected robots-- to filter out scores from those users from the sum.
# Write the results to BigQuery.
team_table_spec = table_spec_prefix + '_teams'
team_table_schema = (
'team:STRING, '
'total_score:INTEGER, '
'window_start:STRING, '
'processing_time: STRING')

(raw_events # pylint: disable=expression-not-assigned
| 'WindowIntoFixedWindows' >> beam.WindowInto(
beam.window.FixedWindows(fixed_window_duration))
Expand All @@ -344,19 +317,20 @@ def run(argv=None):
| 'ExtractAndSumScore' >> ExtractAndSumScore('team')
# [END filter_and_calc]
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name + '_teams', args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
}))
| 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery(
team_table_spec,
schema=team_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

# [START session_calc]
# Detect user sessions-- that is, a burst of activity separated by a gap
# from further activity. Find and record the mean session lengths.
# This information could help the game designers track the changing user
# engagement as their set of game changes.
session_table_spec = table_spec_prefix + '_sessions'
session_table_schema = 'mean_duration:FLOAT'

(user_events # pylint: disable=expression-not-assigned
| 'WindowIntoSessions' >> beam.WindowInto(
beam.window.Sessions(session_gap),
Expand All @@ -381,10 +355,11 @@ def run(argv=None):
| beam.CombineGlobally(beam.combiners.MeanCombineFn()).without_defaults()
| 'FormatAvgSessionLength' >> beam.Map(
lambda elem: {'mean_duration': float(elem)})
| 'WriteAvgSessionLength' >> WriteToBigQuery(
args.table_name + '_sessions', args.dataset, {
'mean_duration': 'FLOAT',
}))
| 'WriteAvgSessionLength' >> beam.io.WriteToBigQuery(
session_table_spec,
schema=session_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
# [END rewindow]


Expand Down
57 changes: 13 additions & 44 deletions sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,43 +156,6 @@ def process(self, team_score, window=beam.DoFn.WindowParam):
}


class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema

def get_schema(self):
"""Build the output table schema."""
return ', '.join(
'%s:%s' % (col, self.schema[col]) for col in self.schema)

def get_table(self, pipeline):
"""Utility to construct an output table reference."""
project = pipeline.options.view_as(GoogleCloudOptions).project
return '%s:%s.%s' % (project, self.dataset, self.table_name)

def expand(self, pcoll):
table = self.get_table(pcoll.pipeline)
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.Write(beam.io.BigQuerySink(
table,
schema=self.get_schema(),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))


# [START main]
class HourlyTeamScore(beam.PTransform):
def __init__(self, start_min, stop_min, window_duration):
Expand Down Expand Up @@ -278,7 +241,8 @@ def run(argv=None):
options = PipelineOptions(pipeline_args)

# We also require the --project option to access --dataset
if options.view_as(GoogleCloudOptions).project is None:
project = options.view_as(GoogleCloudOptions).project
if project is None:
parser.print_usage()
print(sys.argv[0] + ': error: argument --project is required')
sys.exit(1)
Expand All @@ -287,18 +251,23 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = True

table_spec = '{}:{}.{}'.format(project, args.dataset, args.table_name)
table_schema = (
'team:STRING, '
'total_score:INTEGER, '
'window_start:STRING')

with beam.Pipeline(options=options) as p:
(p # pylint: disable=expression-not-assigned
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'HourlyTeamScore' >> HourlyTeamScore(
args.start_min, args.stop_min, args.window_duration)
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name, args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
}))
| 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
# [END main]


Expand Down
81 changes: 26 additions & 55 deletions sdks/python/apache_beam/examples/complete/game/leader_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,43 +171,6 @@ def process(self, team_score, window=beam.DoFn.WindowParam):
}


class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema

def get_schema(self):
"""Build the output table schema."""
return ', '.join(
'%s:%s' % (col, self.schema[col]) for col in self.schema)

def get_table(self, pipeline):
"""Utility to construct an output table reference."""
project = pipeline.options.view_as(GoogleCloudOptions).project
return '%s:%s.%s' % (project, self.dataset, self.table_name)

def expand(self, pcoll):
table = self.get_table(pcoll.pipeline)
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.Write(beam.io.BigQuerySink(
table,
schema=self.get_schema(),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))


# [START window_and_trigger]
class CalculateTeamScores(beam.PTransform):
"""Calculates scores for each team within the configured window duration.
Expand Down Expand Up @@ -294,7 +257,8 @@ def run(argv=None):
options = PipelineOptions(pipeline_args)

# We also require the --project option to access --dataset
if options.view_as(GoogleCloudOptions).project is None:
project = options.view_as(GoogleCloudOptions).project
if project is None:
parser.print_usage()
print(sys.argv[0] + ': error: argument --project is required')
sys.exit(1)
Expand All @@ -306,6 +270,8 @@ def run(argv=None):
# Enforce that this pipeline is always run in streaming mode
options.view_as(StandardOptions).streaming = True

table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name)

with beam.Pipeline(options=options) as p:
# Read game events from Pub/Sub using custom timestamps, which are extracted
# from the pubsub data elements, and parse the data.
Expand All @@ -316,32 +282,37 @@ def run(argv=None):
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])))

team_table_spec = table_spec_prefix + '_teams'
team_table_schema = (
'team:STRING, '
'total_score:INTEGER, '
'window_start:STRING, '
'processing_time: STRING')

# Get team scores and write the results to BigQuery
(events # pylint: disable=expression-not-assigned
| 'CalculateTeamScores' >> CalculateTeamScores(
args.team_window_duration, args.allowed_lateness)
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name + '_teams', args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
}))

def format_user_score_sums(user_score):
(user, score) = user_score
return {'user': user, 'total_score': score}
| 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery(
team_table_spec,
schema=team_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

user_table_spec = table_spec_prefix + '_users'
user_table_schema = 'user:STRING, total_score:INTEGER'

# Get user scores and write the results to BigQuery
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
| 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
| 'WriteUserScoreSums' >> WriteToBigQuery(
args.table_name + '_users', args.dataset, {
'user': 'STRING',
'total_score': 'INTEGER',
}))
| 'FormatUserScoreSums' >> beam.Map(
lambda elem: {'user': elem[0], 'total_score': elem[1]})
| 'WriteUserScoreSums' >> beam.io.WriteToBigQuery(
user_table_spec,
schema=user_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


if __name__ == '__main__':
Expand Down