diff --git a/CHANGES.md b/CHANGES.md index 4d4aee7ed25d..f69daf957164 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -32,6 +32,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Add support for streaming writes in IOBase (Python) ## New Features / Improvements diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 53215275e050..67d6cd358a07 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -42,6 +42,7 @@ from typing import Tuple from typing import Union +import apache_beam as beam from apache_beam import coders from apache_beam import pvalue from apache_beam.coders.coders import _MemoizingPickleCoder @@ -56,6 +57,7 @@ from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.transforms import window +from apache_beam.transforms.core import DoFn from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData from apache_beam.utils import timestamp @@ -751,6 +753,9 @@ class Sink(HasDisplayData): # Whether Beam should skip writing any shards if all are empty. skip_if_empty = False + # the Write triggering frequency in streaming with GlobalWindow + triggering_frequency = 0 + def initialize_write(self): """Initializes the sink before writing begins. @@ -779,7 +784,7 @@ def open_writer(self, init_result, uid): raise NotImplementedError def pre_finalize(self, init_result, writer_results): - """Pre-finalization stage for sink. + """Pre-finalization stage for sink for bounded PCollections. Called after all bundle writes are complete and before finalize_write. Used to setup and verify filesystem and sink states. @@ -797,8 +802,28 @@ def pre_finalize(self, init_result, writer_results): """ raise NotImplementedError + def pre_finalize_windowed(self, init_result, writer_results, window=None): + """Pre-finalization stage for sink for unbounded PCollections. + + Called after all bundle writes are complete and before finalize_write. + Used to setup and verify filesystem and sink states. + + Args: + init_result: the result of ``initialize_write()`` invocation. + writer_results: an iterable containing results of ``Writer.close()`` + invocations. This will only contain results of successful writes, and + will only contain the result of a single successful write for a given + bundle. + window: DoFn window + + Returns: + An object that contains any sink specific state generated. + This object will be passed to finalize_windowed_write(). + """ + raise NotImplementedError + def finalize_write(self, init_result, writer_results, pre_finalize_result): - """Finalizes the sink after all data is written to it. + """Finalizes the sink after all data is written to it. (batch) Given the result of initialization and an iterable of results from bundle writes, performs finalization after writing and closes the sink. Called @@ -833,6 +858,22 @@ def finalize_write(self, init_result, writer_results, pre_finalize_result): """ raise NotImplementedError + def finalize_windowed_write( + self, init_result, writer_results, pre_finalize_result, window=None): + """Finalizes the sink after all data is written to it for a window. Similar + to ``finalize_write()``. + + Args: + init_result: the result of ``initialize_write()`` invocation. + writer_results: an iterable containing results of ``Writer.close()`` + invocations. This will only contain results of successful writes, and + will only contain the result of a single successful write for a given + bundle. + pre_finalize_result: the result of ``pre_finalize()`` invocation. + window: DoFn window + """ + raise NotImplementedError + class Writer(object): """This class is deprecated, no backwards-compatibility guarantees. @@ -1126,12 +1167,13 @@ def __init__(self, sink: Sink) -> None: super().__init__() self.sink = sink - def expand(self, pcoll): + def _expand_bounded(self, pcoll, min_shards): + """Handles the expansion logic for a bounded PCollection.""" do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) init_result_coll = do_once | 'InitializeWrite' >> core.Map( lambda _, sink: sink.initialize_write(), self.sink) - if getattr(self.sink, 'num_shards', 0): - min_shards = self.sink.num_shards + + if (min_shards >= 1): if min_shards == 1: keyed_pcoll = pcoll | core.Map(lambda x: (None, x)) else: @@ -1142,8 +1184,11 @@ def expand(self, pcoll): | core.GroupByKey() | 'WriteBundles' >> core.ParDo( _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll))) - else: + else: # min_shards is 0, num_shards was undef + _LOGGER.info( + "*** WriteImpl min_shards undef so it's 1, and we write per Bundle") min_shards = 1 + write_result_coll = ( pcoll | core.WindowInto(window.GlobalWindows()) @@ -1152,8 +1197,7 @@ def expand(self, pcoll): | 'Pair' >> core.Map(lambda x: (None, x)) | core.GroupByKey() | 'Extract' >> core.FlatMap(lambda x: x[1])) - # PreFinalize should run before FinalizeWrite, and the two should not be - # fused. + pre_finalize_coll = ( do_once | 'PreFinalize' >> core.FlatMap( @@ -1161,13 +1205,101 @@ def expand(self, pcoll): self.sink, AsSingleton(init_result_coll), AsIter(write_result_coll))) - return do_once | 'FinalizeWrite' >> core.FlatMap( - _finalize_write, - self.sink, - AsSingleton(init_result_coll), - AsIter(write_result_coll), - min_shards, - AsSingleton(pre_finalize_coll)).with_output_types(str) + return ( + do_once | 'FinalizeWrite' >> core.FlatMap( + _finalize_write, + self.sink, + AsSingleton(init_result_coll), + AsIter(write_result_coll), + min_shards, + AsSingleton(pre_finalize_coll)).with_output_types(str)) + + def _apply_windowing(self, pcoll): + """ + Applies windowing to an unbounded PCollection based on the sink's + triggering frequency. + """ + use_fixed_windows = ( + self.sink.triggering_frequency is not None and + self.sink.triggering_frequency > 0) + + if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): + if not use_fixed_windows: + raise ValueError( + 'To write a GlobalWindow unbounded PCollection, ' + 'triggering_frequency must be set and be greater than 0') + return pcoll | 'ApplyFixedWindows' >> core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0)) + + # Keep user-defined windowing unless triggering_frequency is specified. + if use_fixed_windows: + return pcoll | 'ApplyFixedWindows' >> core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0)) + + return pcoll # Keep original windowing + + def _expand_unbounded(self, pcoll, min_shards): + """Handles the expansion logic for an unbounded PCollection.""" + windowed_pcoll = self._apply_windowing(pcoll) + + if self.sink.convert_fn is not None: + windowed_pcoll = windowed_pcoll | core.ParDo(self.sink.convert_fn) + + init_result_window_coll = ( + windowed_pcoll + | 'PairInit' >> core.Map(lambda x: (None, x)) + | 'GroupInit' >> core.GroupByKey() + | 'InitializeWindowedWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink)) + + if min_shards > 1: + keyed_pcoll = windowed_pcoll | core.ParDo( + _RoundRobinKeyFn(), count=min_shards) + grouped_pcoll = keyed_pcoll | 'Group by random key' >> core.GroupByKey() + else: # min_shards is 1 + min_shards = 1 + _LOGGER.info("Writing unbounded data with a single shard per window.") + keyed_pcoll = windowed_pcoll | core.Map(lambda x: (None, x)) + grouped_pcoll = keyed_pcoll | 'Group by window' >> core.GroupByKey() + + write_result_coll = ( + grouped_pcoll + | 'WriteWindowedBundles' >> core.ParDo( + _WriteWindowedBundleDoFn(sink=self.sink, per_key=True), + AsSingleton(init_result_window_coll)) + | 'PairFinal' >> core.Map(lambda x: (None, x)) + | 'GroupFinal' >> core.GroupByKey() + | 'ExtractFinal' >> core.Map(lambda x: x[1])) + + pre_finalized_write_result_coll = ( + write_result_coll + | 'PreFinalize' >> core.ParDo( + _PreFinalizeWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll))) + + return ( + pre_finalized_write_result_coll + | 'FinalizeWrite' >> core.FlatMap( + _finalize_windowed_write, + self.sink, + AsSingleton(init_result_window_coll), + AsSingleton(write_result_coll), + min_shards, + AsIter(pre_finalized_write_result_coll)).with_output_types(str)) + + def expand(self, pcoll): + min_shards = getattr(self.sink, 'num_shards', 0) + + if (pcoll.is_bounded): + return self._expand_bounded(pcoll, min_shards) + else: + return self._expand_unbounded(pcoll, min_shards) class _WriteBundleDoFn(core.DoFn): @@ -1199,6 +1331,94 @@ def finish_bundle(self): window.GlobalWindow().max_timestamp(), [window.GlobalWindow()]) +class _PreFinalizeWindowedBundleDoFn(core.DoFn): + """A DoFn for writing elements to an iobase.Writer. + Opens a writer at the first element and closes the writer at finish_bundle(). + """ + def __init__( + self, + sink, + destination_fn=None, + temp_directory=None, + ): + self.sink = sink + self._temp_directory = temp_directory + self.destination_fn = destination_fn + + def display_data(self): + return {'sink_dd': self.sink} + + def process( + self, + element, + init_result, + w=core.DoFn.WindowParam, + pane=core.DoFn.PaneInfoParam): + self.sink.pre_finalize_windowed( + init_result=init_result, writer_results=element, window=w) + yield element + + +class _WriteWindowedBundleDoFn(core.DoFn): + """A DoFn for writing elements to an iobase.Writer. + Opens a writer at the first element and closes the writer at finish_bundle(). + """ + def __init__(self, sink, per_key=False): + self.sink = sink + self.per_key = per_key + + def display_data(self): + return {'sink_dd': self.sink} + + def start_bundle(self): + self.writer = {} + self.window = {} + self.init_result = {} + + def process( + self, + element, + init_result, + w=core.DoFn.WindowParam, + pane=core.DoFn.PaneInfoParam): + if self.per_key: + w_key = "%s_%s" % (w, element[0]) # key + else: + w_key = w + + if not w in self.writer: + # We ignore UUID collisions here since they are extremely rare. + self.window[w_key] = w + self.writer[w_key] = self.sink.open_writer( + init_result, '%s_%s' % (w_key, uuid.uuid4())) + self.init_result[w_key] = init_result + + if self.per_key: + for e in element[1]: # values + self.writer[w_key].write(e) # value + else: + self.writer[w_key].write(element) + if self.writer[w_key].at_capacity(): + yield self.writer[w_key].close() + self.writer[w_key] = None + + def finish_bundle(self): + for w_key, writer in self.writer.items(): + w = self.window[w_key] + if writer is not None: + closed = writer.temp_shard_path + try: + closed = writer.close() # TODO : improve sink closing for streaming + except ValueError as exp: + _LOGGER.info( + "*** _WriteWindowedBundleDoFn finish_bundle closed ERROR %s", exp) + yield WindowedValue( + closed, + timestamp=w.start, + windows=[w] # TODO(pabloem) HOW DO WE GET THE PANE + ) + + class _WriteKeyedBundleDoFn(core.DoFn): def __init__(self, sink): self.sink = sink @@ -1241,6 +1461,30 @@ def _finalize_write( window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) +def _finalize_windowed_write( + unused_element, + sink, + init_result, + write_results, + min_shards, + pre_finalize_results, + w=DoFn.WindowParam): + write_results = list(write_results) + extra_shards = [] + if len(write_results) < min_shards: + if write_results or not sink.skip_if_empty: + _LOGGER.debug( + 'Creating %s empty shard(s).', min_shards - len(write_results)) + for _ in range(min_shards - len(write_results)): + writer = sink.open_writer(init_result, str(uuid.uuid4())) + extra_shards.append(writer.close()) + outputs = sink.finalize_windowed_write( + init_result, write_results + extra_shards, pre_finalize_results, w) + + if outputs: + return (window.TimestampedValue(v, w.end) for v in outputs) + + class _RoundRobinKeyFn(core.DoFn): def start_bundle(self): self.counter = None