Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Add support for streaming writes in IOBase (Python)

## New Features / Improvements

Expand Down
274 changes: 259 additions & 15 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from typing import Tuple
from typing import Union

import apache_beam as beam
from apache_beam import coders
from apache_beam import pvalue
from apache_beam.coders.coders import _MemoizingPickleCoder
Expand All @@ -56,6 +57,7 @@
from apache_beam.transforms import core
from apache_beam.transforms import ptransform
from apache_beam.transforms import window
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.display import HasDisplayData
from apache_beam.utils import timestamp
Expand Down Expand Up @@ -751,6 +753,9 @@ class Sink(HasDisplayData):
# Whether Beam should skip writing any shards if all are empty.
skip_if_empty = False

# the Write triggering frequency in streaming with GlobalWindow
triggering_frequency = 0

def initialize_write(self):
"""Initializes the sink before writing begins.

Expand Down Expand Up @@ -779,7 +784,7 @@ def open_writer(self, init_result, uid):
raise NotImplementedError

def pre_finalize(self, init_result, writer_results):
"""Pre-finalization stage for sink.
"""Pre-finalization stage for sink for bounded PCollections.

Called after all bundle writes are complete and before finalize_write.
Used to setup and verify filesystem and sink states.
Expand All @@ -797,8 +802,28 @@ def pre_finalize(self, init_result, writer_results):
"""
raise NotImplementedError

def pre_finalize_windowed(self, init_result, writer_results, window=None):
"""Pre-finalization stage for sink for unbounded PCollections.

Called after all bundle writes are complete and before finalize_write.
Used to setup and verify filesystem and sink states.

Args:
init_result: the result of ``initialize_write()`` invocation.
writer_results: an iterable containing results of ``Writer.close()``
invocations. This will only contain results of successful writes, and
will only contain the result of a single successful write for a given
bundle.
window: DoFn window

Returns:
An object that contains any sink specific state generated.
This object will be passed to finalize_windowed_write().
"""
raise NotImplementedError

def finalize_write(self, init_result, writer_results, pre_finalize_result):
"""Finalizes the sink after all data is written to it.
"""Finalizes the sink after all data is written to it. (batch)

Given the result of initialization and an iterable of results from bundle
writes, performs finalization after writing and closes the sink. Called
Expand Down Expand Up @@ -833,6 +858,22 @@ def finalize_write(self, init_result, writer_results, pre_finalize_result):
"""
raise NotImplementedError

def finalize_windowed_write(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer that the two new methods being named in the same way, so maybe we can name this one into finalize_write_windowed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered renaming this function?

self, init_result, writer_results, pre_finalize_result, window=None):
"""Finalizes the sink after all data is written to it for a window. Similar
to ``finalize_write()``.

Args:
init_result: the result of ``initialize_write()`` invocation.
writer_results: an iterable containing results of ``Writer.close()``
invocations. This will only contain results of successful writes, and
will only contain the result of a single successful write for a given
bundle.
pre_finalize_result: the result of ``pre_finalize()`` invocation.
window: DoFn window
"""
raise NotImplementedError


class Writer(object):
"""This class is deprecated, no backwards-compatibility guarantees.
Expand Down Expand Up @@ -1126,12 +1167,13 @@ def __init__(self, sink: Sink) -> None:
super().__init__()
self.sink = sink

def expand(self, pcoll):
def _expand_bounded(self, pcoll, min_shards):
"""Handles the expansion logic for a bounded PCollection."""
do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
init_result_coll = do_once | 'InitializeWrite' >> core.Map(
lambda _, sink: sink.initialize_write(), self.sink)
if getattr(self.sink, 'num_shards', 0):
min_shards = self.sink.num_shards

if (min_shards >= 1):
if min_shards == 1:
keyed_pcoll = pcoll | core.Map(lambda x: (None, x))
else:
Expand All @@ -1142,8 +1184,11 @@ def expand(self, pcoll):
| core.GroupByKey()
| 'WriteBundles' >> core.ParDo(
_WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll)))
else:
else: # min_shards is 0, num_shards was undef
_LOGGER.info(
"*** WriteImpl min_shards undef so it's 1, and we write per Bundle")
min_shards = 1

write_result_coll = (
pcoll
| core.WindowInto(window.GlobalWindows())
Expand All @@ -1152,22 +1197,109 @@ def expand(self, pcoll):
| 'Pair' >> core.Map(lambda x: (None, x))
| core.GroupByKey()
| 'Extract' >> core.FlatMap(lambda x: x[1]))
# PreFinalize should run before FinalizeWrite, and the two should not be
# fused.

pre_finalize_coll = (
do_once
| 'PreFinalize' >> core.FlatMap(
_pre_finalize,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll)))
return do_once | 'FinalizeWrite' >> core.FlatMap(
_finalize_write,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll),
min_shards,
AsSingleton(pre_finalize_coll)).with_output_types(str)
return (
do_once | 'FinalizeWrite' >> core.FlatMap(
_finalize_write,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll),
min_shards,
AsSingleton(pre_finalize_coll)).with_output_types(str))

def _apply_windowing(self, pcoll):
"""
Applies windowing to an unbounded PCollection based on the sink's
triggering frequency.
"""
use_fixed_windows = (
self.sink.triggering_frequency is not None and
self.sink.triggering_frequency > 0)

if isinstance(pcoll.windowing.windowfn, window.GlobalWindows):
if not use_fixed_windows:
raise ValueError(
'To write a GlobalWindow unbounded PCollection, '
'triggering_frequency must be set and be greater than 0')
return pcoll | 'ApplyFixedWindows' >> core.WindowInto(
window.FixedWindows(self.sink.triggering_frequency),
trigger=beam.transforms.trigger.AfterWatermark(),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
allowed_lateness=beam.utils.timestamp.Duration(seconds=0))

# Keep user-defined windowing unless triggering_frequency is specified.
if use_fixed_windows:
return pcoll | 'ApplyFixedWindows' >> core.WindowInto(
window.FixedWindows(self.sink.triggering_frequency),
trigger=beam.transforms.trigger.AfterWatermark(),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
allowed_lateness=beam.utils.timestamp.Duration(seconds=0))

return pcoll # Keep original windowing

def _expand_unbounded(self, pcoll, min_shards):
"""Handles the expansion logic for an unbounded PCollection."""
windowed_pcoll = self._apply_windowing(pcoll)

if self.sink.convert_fn is not None:
windowed_pcoll = windowed_pcoll | core.ParDo(self.sink.convert_fn)

init_result_window_coll = (
windowed_pcoll
| 'PairInit' >> core.Map(lambda x: (None, x))
| 'GroupInit' >> core.GroupByKey()
| 'InitializeWindowedWrite' >> core.Map(
lambda _, sink: sink.initialize_write(), self.sink))

if min_shards > 1:
keyed_pcoll = windowed_pcoll | core.ParDo(
_RoundRobinKeyFn(), count=min_shards)
grouped_pcoll = keyed_pcoll | 'Group by random key' >> core.GroupByKey()
else: # min_shards is 1
min_shards = 1
_LOGGER.info("Writing unbounded data with a single shard per window.")
keyed_pcoll = windowed_pcoll | core.Map(lambda x: (None, x))
grouped_pcoll = keyed_pcoll | 'Group by window' >> core.GroupByKey()

write_result_coll = (
grouped_pcoll
| 'WriteWindowedBundles' >> core.ParDo(
_WriteWindowedBundleDoFn(sink=self.sink, per_key=True),
AsSingleton(init_result_window_coll))
| 'PairFinal' >> core.Map(lambda x: (None, x))
| 'GroupFinal' >> core.GroupByKey()
| 'ExtractFinal' >> core.Map(lambda x: x[1]))

pre_finalized_write_result_coll = (
write_result_coll
| 'PreFinalize' >> core.ParDo(
_PreFinalizeWindowedBundleDoFn(self.sink),
AsSingleton(init_result_window_coll)))

return (
pre_finalized_write_result_coll
| 'FinalizeWrite' >> core.FlatMap(
_finalize_windowed_write,
self.sink,
AsSingleton(init_result_window_coll),
AsSingleton(write_result_coll),
min_shards,
AsIter(pre_finalized_write_result_coll)).with_output_types(str))

def expand(self, pcoll):
min_shards = getattr(self.sink, 'num_shards', 0)

if (pcoll.is_bounded):
return self._expand_bounded(pcoll, min_shards)
else:
return self._expand_unbounded(pcoll, min_shards)


class _WriteBundleDoFn(core.DoFn):
Expand Down Expand Up @@ -1199,6 +1331,94 @@ def finish_bundle(self):
window.GlobalWindow().max_timestamp(), [window.GlobalWindow()])


class _PreFinalizeWindowedBundleDoFn(core.DoFn):
"""A DoFn for writing elements to an iobase.Writer.
Opens a writer at the first element and closes the writer at finish_bundle().
"""
def __init__(
self,
sink,
destination_fn=None,
temp_directory=None,
):
self.sink = sink
self._temp_directory = temp_directory
self.destination_fn = destination_fn

def display_data(self):
return {'sink_dd': self.sink}

def process(
self,
element,
init_result,
w=core.DoFn.WindowParam,
pane=core.DoFn.PaneInfoParam):
self.sink.pre_finalize_windowed(
init_result=init_result, writer_results=element, window=w)
yield element


class _WriteWindowedBundleDoFn(core.DoFn):
"""A DoFn for writing elements to an iobase.Writer.
Opens a writer at the first element and closes the writer at finish_bundle().
"""
def __init__(self, sink, per_key=False):
self.sink = sink
self.per_key = per_key

def display_data(self):
return {'sink_dd': self.sink}

def start_bundle(self):
self.writer = {}
self.window = {}
self.init_result = {}

def process(
self,
element,
init_result,
w=core.DoFn.WindowParam,
pane=core.DoFn.PaneInfoParam):
if self.per_key:
w_key = "%s_%s" % (w, element[0]) # key
else:
w_key = w

if not w in self.writer:
# We ignore UUID collisions here since they are extremely rare.
self.window[w_key] = w
self.writer[w_key] = self.sink.open_writer(
init_result, '%s_%s' % (w_key, uuid.uuid4()))
self.init_result[w_key] = init_result

if self.per_key:
for e in element[1]: # values
self.writer[w_key].write(e) # value
else:
self.writer[w_key].write(element)
if self.writer[w_key].at_capacity():
yield self.writer[w_key].close()
self.writer[w_key] = None

def finish_bundle(self):
for w_key, writer in self.writer.items():
w = self.window[w_key]
if writer is not None:
closed = writer.temp_shard_path
try:
closed = writer.close() # TODO : improve sink closing for streaming
except ValueError as exp:
_LOGGER.info(
"*** _WriteWindowedBundleDoFn finish_bundle closed ERROR %s", exp)
yield WindowedValue(
closed,
timestamp=w.start,
windows=[w] # TODO(pabloem) HOW DO WE GET THE PANE
)


class _WriteKeyedBundleDoFn(core.DoFn):
def __init__(self, sink):
self.sink = sink
Expand Down Expand Up @@ -1241,6 +1461,30 @@ def _finalize_write(
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)


def _finalize_windowed_write(
unused_element,
sink,
init_result,
write_results,
min_shards,
pre_finalize_results,
w=DoFn.WindowParam):
write_results = list(write_results)
extra_shards = []
if len(write_results) < min_shards:
if write_results or not sink.skip_if_empty:
_LOGGER.debug(
'Creating %s empty shard(s).', min_shards - len(write_results))
for _ in range(min_shards - len(write_results)):
writer = sink.open_writer(init_result, str(uuid.uuid4()))
extra_shards.append(writer.close())
outputs = sink.finalize_windowed_write(
init_result, write_results + extra_shards, pre_finalize_results, w)

if outputs:
return (window.TimestampedValue(v, w.end) for v in outputs)


class _RoundRobinKeyFn(core.DoFn):
def start_bundle(self):
self.counter = None
Expand Down
Loading