From 1f52da73eb38dac99bd8b02fdd0378315fa910fb Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 3 Jun 2025 16:47:59 +0000 Subject: [PATCH 1/9] Add support for streaming writes in IOBase (Python) --- CHANGES.md | 1 + sdks/python/apache_beam/io/iobase.py | 380 ++++++++++++++++++++++++--- 2 files changed, 344 insertions(+), 37 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index cff581230b7d..5f6fe19e1882 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -32,6 +32,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Add support for streaming writes in IOBase (Python) ## New Features / Improvements diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 53215275e050..83857486356c 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -42,6 +42,7 @@ from typing import Tuple from typing import Union +import apache_beam as beam from apache_beam import coders from apache_beam import pvalue from apache_beam.coders.coders import _MemoizingPickleCoder @@ -56,6 +57,7 @@ from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.transforms import window +from apache_beam.transforms.core import DoFn from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData from apache_beam.utils import timestamp @@ -779,7 +781,7 @@ def open_writer(self, init_result, uid): raise NotImplementedError def pre_finalize(self, init_result, writer_results): - """Pre-finalization stage for sink. + """Pre-finalization stage for sink for bounded PCollections. Called after all bundle writes are complete and before finalize_write. Used to setup and verify filesystem and sink states. @@ -797,8 +799,65 @@ def pre_finalize(self, init_result, writer_results): """ raise NotImplementedError + def pre_finalize_windowed(self, init_result, writer_results, window=None): + """Pre-finalization stage for sink for unbounded PCollections. + + Called after all bundle writes are complete and before finalize_write. + Used to setup and verify filesystem and sink states. + + Args: + init_result: the result of ``initialize_write()`` invocation. + writer_results: an iterable containing results of ``Writer.close()`` + invocations. This will only contain results of successful writes, and + will only contain the result of a single successful write for a given + bundle. + window: DoFn window + + Returns: + An object that contains any sink specific state generated. + This object will be passed to finalize_windowed_write(). + """ + raise NotImplementedError + def finalize_write(self, init_result, writer_results, pre_finalize_result): - """Finalizes the sink after all data is written to it. + """Finalizes the sink after all data is written to it. (batch) + + Given the result of initialization and an iterable of results from bundle + writes, performs finalization after writing and closes the sink. Called + after all bundle writes are complete. + + The bundle write results that are passed to finalize are those returned by + bundles that completed successfully. Although bundles may have been run + multiple times (for fault-tolerance), only one writer result will be passed + to finalize for each bundle. An implementation of finalize should perform + clean up of any failed and successfully retried bundles. Note that these + failed bundles will not have their writer result passed to finalize, so + finalize should be capable of locating any temporary/partial output written + by failed bundles. + + If all retries of a bundle fails, the whole pipeline will fail *without* + finalize_write() being invoked. + + A best practice is to make finalize atomic. If this is impossible given the + semantics of the sink, finalize should be idempotent, as it may be called + multiple times in the case of failure/retry or for redundancy. + + Note that the iteration order of the writer results is not guaranteed to be + consistent if finalize is called multiple times. + + Args: + init_result: the result of ``initialize_write()`` invocation. + writer_results: an iterable containing results of ``Writer.close()`` + invocations. This will only contain results of successful writes, and + will only contain the result of a single successful write for a given + bundle. + pre_finalize_result: the result of ``pre_finalize()`` invocation. + """ + raise NotImplementedError + + def finalize_windowed_write( + self, init_result, writer_results, pre_finalize_result, w=None): + """Finalizes the sink after all data is written to it for a window. Given the result of initialization and an iterable of results from bundle writes, performs finalization after writing and closes the sink. Called @@ -830,6 +889,7 @@ def finalize_write(self, init_result, writer_results, pre_finalize_result): will only contain the result of a single successful write for a given bundle. pre_finalize_result: the result of ``pre_finalize()`` invocation. + w: DoFn window """ raise NotImplementedError @@ -1127,47 +1187,183 @@ def __init__(self, sink: Sink) -> None: self.sink = sink def expand(self, pcoll): - do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) - init_result_coll = do_once | 'InitializeWrite' >> core.Map( - lambda _, sink: sink.initialize_write(), self.sink) + if (pcoll.is_bounded): + do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) + init_result_coll = do_once | 'InitializeWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink) if getattr(self.sink, 'num_shards', 0): min_shards = self.sink.num_shards - if min_shards == 1: - keyed_pcoll = pcoll | core.Map(lambda x: (None, x)) - else: - keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(), count=min_shards) - write_result_coll = ( - keyed_pcoll - | core.WindowInto(window.GlobalWindows()) - | core.GroupByKey() - | 'WriteBundles' >> core.ParDo( - _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll))) + + if (pcoll.is_bounded): + if min_shards == 1: + keyed_pcoll = pcoll | core.Map(lambda x: (None, x)) + else: + keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(), count=min_shards) + write_result_coll = ( + keyed_pcoll + | core.WindowInto(window.GlobalWindows()) + | core.GroupByKey() + | 'WriteBundles' >> core.ParDo( + _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll)) + ) + else: #unbounded PCollection needes to be written per window + if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): + if (self.sink.triggering_frequency is None or + self.sink.triggering_frequency == 0): + raise ValueError( + 'To write a GlobalWindow unbounded PCollection, ' + 'triggering_frequency must be set and be greater than 0') + widowed_pcoll = ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + else: + #keep user windowing, unless triggering_frequency has been specified + if (self.sink.triggering_frequency is not None and + self.sink.triggering_frequency > 0): + widowed_pcoll = ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + else: #keep user windowing + widowed_pcoll = pcoll + if self.sink.convert_fn is not None: + widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) + if min_shards == 1: + keyed_pcoll = widowed_pcoll | core.Map(lambda x: (None, x)) + else: + keyed_pcoll = widowed_pcoll | core.ParDo( + _RoundRobinKeyFn(), count=min_shards) + init_result_window_coll = ( + keyed_pcoll + | 'Pair init' >> core.Map(lambda x: (None, x)) + | 'Pair init gbk' >> core.GroupByKey() + | 'InitializeWindowedWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink)) + + write_result_coll = ( + keyed_pcoll + | 'Group by random key' >> core.GroupByKey() + | 'WriteWindowedBundles' >> core.ParDo( + _WriteWindowedBundleDoFn(sink=self.sink, per_key=True), + AsSingleton(init_result_window_coll)) + | 'Pair' >> core.Map(lambda x: (None, x)) + | core.GroupByKey() + | 'Extract' >> core.Map(lambda x: x[1])) + pre_finalized_write_result_coll = ( + write_result_coll + | 'PreFinalize' >> core.ParDo( + _PreFinalizeWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll))) + finalized_write_result_coll = ( + pre_finalized_write_result_coll + | 'FinalizeWrite' >> core.FlatMap( + _finalize_windowed_write, + self.sink, + AsSingleton(init_result_window_coll), + AsSingleton(write_result_coll), + min_shards, + AsIter(pre_finalized_write_result_coll)).with_output_types(str)) + return finalized_write_result_coll else: + _LOGGER.info( + "*** WriteImpl min_shards undef so it's 1, and we write per Bundle") min_shards = 1 - write_result_coll = ( - pcoll - | core.WindowInto(window.GlobalWindows()) - | 'WriteBundles' >> core.ParDo( - _WriteBundleDoFn(self.sink), AsSingleton(init_result_coll)) - | 'Pair' >> core.Map(lambda x: (None, x)) - | core.GroupByKey() - | 'Extract' >> core.FlatMap(lambda x: x[1])) + if (pcoll.is_bounded): + write_result_coll = ( + pcoll + | core.WindowInto(window.GlobalWindows()) + | 'WriteBundles' >> core.ParDo( + _WriteBundleDoFn(self.sink), AsSingleton(init_result_coll)) + | 'Pair' >> core.Map(lambda x: (None, x)) + | core.GroupByKey() + | 'Extract' >> core.FlatMap(lambda x: x[1])) + else: #unbounded PCollection needes to be written per window + if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): + if (self.sink.triggering_frequency is None or + self.sink.triggering_frequency == 0): + raise ValueError( + 'To write a GlobalWindow PCollection, triggering_frequency must' + ' be set and be greater than 0') + widowed_pcoll = ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + else: + #keep user windowing, unless triggering_frequency has been specified + if (self.sink.triggering_frequency is not None and + self.sink.triggering_frequency > 0): + widowed_pcoll = ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + else: #keep user windowing + widowed_pcoll = pcoll + init_result_window_coll = ( + widowed_pcoll + | 'Pair init' >> core.Map(lambda x: (None, x)) + | 'Pair init gbk' >> core.GroupByKey() + | 'InitializeWindowedWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink)) + if self.sink.convert_fn is not None: + widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) + write_result_coll = ( + widowed_pcoll + | 'WriteWindowedBundles' >> core.ParDo( + _WriteWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll)) + | 'Pair' >> core.Map(lambda x: (None, x)) + | core.GroupByKey() + | 'Extract' >> core.Map(lambda x: x[1])) + pre_finalized_write_result_coll = ( + write_result_coll + | 'PreFinalize' >> core.ParDo( + _PreFinalizeWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll))) + finalized_write_result_coll = ( + pre_finalized_write_result_coll + | 'FinalizeWrite' >> core.FlatMap( + _finalize_windowed_write, + self.sink, + AsSingleton(init_result_window_coll), + AsSingleton(write_result_coll), + min_shards, + AsIter(pre_finalized_write_result_coll)).with_output_types(str)) + return finalized_write_result_coll # PreFinalize should run before FinalizeWrite, and the two should not be # fused. - pre_finalize_coll = ( - do_once - | 'PreFinalize' >> core.FlatMap( - _pre_finalize, - self.sink, - AsSingleton(init_result_coll), - AsIter(write_result_coll))) - return do_once | 'FinalizeWrite' >> core.FlatMap( - _finalize_write, - self.sink, - AsSingleton(init_result_coll), - AsIter(write_result_coll), - min_shards, - AsSingleton(pre_finalize_coll)).with_output_types(str) + if (pcoll.is_bounded): + pre_finalize_coll = ( + do_once + | 'PreFinalize' >> core.FlatMap( + _pre_finalize, + self.sink, + AsSingleton(init_result_coll), + AsIter(write_result_coll))) + return ( + do_once | 'FinalizeWrite' >> core.FlatMap( + _finalize_write, + self.sink, + AsSingleton(init_result_coll), + AsIter(write_result_coll), + min_shards, + AsSingleton(pre_finalize_coll)).with_output_types(str)) class _WriteBundleDoFn(core.DoFn): @@ -1199,6 +1395,94 @@ def finish_bundle(self): window.GlobalWindow().max_timestamp(), [window.GlobalWindow()]) +class _PreFinalizeWindowedBundleDoFn(core.DoFn): + """A DoFn for writing elements to an iobase.Writer. + Opens a writer at the first element and closes the writer at finish_bundle(). + """ + def __init__( + self, + sink, + destination_fn=None, + temp_directory=None, + ): + self.sink = sink + self._temp_directory = temp_directory + self.destination_fn = destination_fn + + def display_data(self): + return {'sink_dd': self.sink} + + def process( + self, + element, + init_result, + w=core.DoFn.WindowParam, + pane=core.DoFn.PaneInfoParam): + self.sink.pre_finalize_windowed( + init_result=init_result, writer_results=element, window=w) + yield element + + +class _WriteWindowedBundleDoFn(core.DoFn): + """A DoFn for writing elements to an iobase.Writer. + Opens a writer at the first element and closes the writer at finish_bundle(). + """ + def __init__(self, sink, per_key=False): + self.sink = sink + self.per_key = per_key + + def display_data(self): + return {'sink_dd': self.sink} + + def start_bundle(self): + self.writer = {} + self.window = {} + self.init_result = {} + + def process( + self, + element, + init_result, + w=core.DoFn.WindowParam, + pane=core.DoFn.PaneInfoParam): + if self.per_key: + w_key = "%s_%s" % (w, element[0]) # key + else: + w_key = w + + if not w in self.writer: + # We ignore UUID collisions here since they are extremely rare. + self.window[w_key] = w + self.writer[w_key] = self.sink.open_writer( + init_result, '%s_%s' % (w_key, uuid.uuid4())) + self.init_result[w_key] = init_result + + if self.per_key: + for e in element[1]: # values + self.writer[w_key].write(e) # value + else: + self.writer[w_key].write(element) + if self.writer[w_key].at_capacity(): + yield self.writer[w_key].close() + self.writer[w_key] = None + + def finish_bundle(self): + for w_key, writer in self.writer.items(): + w = self.window[w_key] + if writer is not None: + closed = writer.temp_shard_path + try: + closed = writer.close() # TODO : improve sink closing for streaming + except ValueError as exp: + _LOGGER.info( + "*** _WriteWindowedBundleDoFn finish_bundle closed ERROR %s", exp) + yield WindowedValue( + closed, + timestamp=w.start, + windows=[w] # TODO(pabloem) HOW DO WE GET THE PANE + ) + + class _WriteKeyedBundleDoFn(core.DoFn): def __init__(self, sink): self.sink = sink @@ -1240,6 +1524,28 @@ def _finalize_write( return ( window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) +def _finalize_windowed_write( + unused_element, + sink, + init_result, + write_results, + min_shards, + pre_finalize_results, + w=DoFn.WindowParam): + write_results = list(write_results) + extra_shards = [] + if len(write_results) < min_shards: + if write_results or not sink.skip_if_empty: + _LOGGER.debug( + 'Creating %s empty shard(s).', min_shards - len(write_results)) + for _ in range(min_shards - len(write_results)): + writer = sink.open_writer(init_result, str(uuid.uuid4())) + extra_shards.append(writer.close()) + outputs = sink.finalize_windowed_write( + init_result, write_results + extra_shards, pre_finalize_results, w) + + if outputs: + return (window.TimestampedValue(v, w.end) for v in outputs) class _RoundRobinKeyFn(core.DoFn): def start_bundle(self): From fe9eab211b4015672348d2aace0683631fb8e97e Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 3 Jun 2025 17:17:36 +0000 Subject: [PATCH 2/9] add triggering_frequency in iobase.Sink --- sdks/python/apache_beam/io/iobase.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 83857486356c..94c42150496e 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -753,6 +753,9 @@ class Sink(HasDisplayData): # Whether Beam should skip writing any shards if all are empty. skip_if_empty = False + # the Write triggering frequency in streaming with GlobalWindow + triggering_frequency = 0 + def initialize_write(self): """Initializes the sink before writing begins. From 9cc97c668e922972ae9ac45493355d6bbf0dad04 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 3 Jun 2025 19:41:12 +0000 Subject: [PATCH 3/9] fix whitespaces/newlines --- sdks/python/apache_beam/io/iobase.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 94c42150496e..092b83611530 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1527,6 +1527,7 @@ def _finalize_write( return ( window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) + def _finalize_windowed_write( unused_element, sink, @@ -1550,6 +1551,7 @@ def _finalize_windowed_write( if outputs: return (window.TimestampedValue(v, w.end) for v in outputs) + class _RoundRobinKeyFn(core.DoFn): def start_bundle(self): self.counter = None From 6d1aa55476ccd50e31a0f9b8b49f6c903206be16 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Tue, 10 Jun 2025 11:55:33 +0000 Subject: [PATCH 4/9] fixes per https://github.com/apache/beam/pull/35137#pullrequestreview-2906007316 --- sdks/python/apache_beam/io/iobase.py | 30 ++++------------------------ 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 092b83611530..60528af2f257 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -859,31 +859,9 @@ def finalize_write(self, init_result, writer_results, pre_finalize_result): raise NotImplementedError def finalize_windowed_write( - self, init_result, writer_results, pre_finalize_result, w=None): - """Finalizes the sink after all data is written to it for a window. - - Given the result of initialization and an iterable of results from bundle - writes, performs finalization after writing and closes the sink. Called - after all bundle writes are complete. - - The bundle write results that are passed to finalize are those returned by - bundles that completed successfully. Although bundles may have been run - multiple times (for fault-tolerance), only one writer result will be passed - to finalize for each bundle. An implementation of finalize should perform - clean up of any failed and successfully retried bundles. Note that these - failed bundles will not have their writer result passed to finalize, so - finalize should be capable of locating any temporary/partial output written - by failed bundles. - - If all retries of a bundle fails, the whole pipeline will fail *without* - finalize_write() being invoked. - - A best practice is to make finalize atomic. If this is impossible given the - semantics of the sink, finalize should be idempotent, as it may be called - multiple times in the case of failure/retry or for redundancy. - - Note that the iteration order of the writer results is not guaranteed to be - consistent if finalize is called multiple times. + self, init_result, writer_results, pre_finalize_result, window=None): + """Finalizes the sink after all data is written to it for a window. Similar + to ``finalize_write()``. Args: init_result: the result of ``initialize_write()`` invocation. @@ -892,7 +870,7 @@ def finalize_windowed_write( will only contain the result of a single successful write for a given bundle. pre_finalize_result: the result of ``pre_finalize()`` invocation. - w: DoFn window + window: DoFn window """ raise NotImplementedError From 60a58c5d63bb7745ad524332ba3f86967f69fffa Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Fri, 13 Jun 2025 16:56:29 +0000 Subject: [PATCH 5/9] refactor for code redability --- sdks/python/apache_beam/io/iobase.py | 185 +++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 60528af2f257..932d4b0953a0 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1167,7 +1167,192 @@ def __init__(self, sink: Sink) -> None: super().__init__() self.sink = sink + def _expand_bounded(self, pcoll, min_shards): + """Handles the expansion logic for a bounded PCollection.""" + do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) + init_result_coll = do_once | 'InitializeWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink) + + if (min_shards >= 1): + if min_shards == 1: + keyed_pcoll = pcoll | core.Map(lambda x: (None, x)) + else: + keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(), count=min_shards) + write_result_coll = ( + keyed_pcoll + | core.WindowInto(window.GlobalWindows()) + | core.GroupByKey() + | 'WriteBundles' >> core.ParDo( + _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll))) + else: # min_shards is 0, num_shards was undef + _LOGGER.info( + "*** WriteImpl min_shards undef so it's 1, and we write per Bundle") + min_shards = 1 + + write_result_coll = ( + pcoll + | core.WindowInto(window.GlobalWindows()) + | 'WriteBundles' >> core.ParDo( + _WriteBundleDoFn(self.sink), AsSingleton(init_result_coll)) + | 'Pair' >> core.Map(lambda x: (None, x)) + | core.GroupByKey() + | 'Extract' >> core.FlatMap(lambda x: x[1])) + + pre_finalize_coll = ( + do_once + | 'PreFinalize' >> core.FlatMap( + _pre_finalize, + self.sink, + AsSingleton(init_result_coll), + AsIter(write_result_coll))) + return ( + do_once | 'FinalizeWrite' >> core.FlatMap( + _finalize_write, + self.sink, + AsSingleton(init_result_coll), + AsIter(write_result_coll), + min_shards, + AsSingleton(pre_finalize_coll)).with_output_types(str)) + + def _apply_windowing(self, pcoll): + return ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + + def _expand_unbounded(self, pcoll, min_shards): + """Handles the expansion logic for an unbounded PCollection.""" + if (min_shards >= 1): + #unbounded PCollection needes to be written per window + if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): + if (self.sink.triggering_frequency is None or + self.sink.triggering_frequency == 0): + raise ValueError( + 'To write a GlobalWindow unbounded PCollection, ' + 'triggering_frequency must be set and be greater than 0') + widowed_pcoll = self._apply_windowing(pcoll) + else: + #keep user windowing, unless triggering_frequency has been specified + if (self.sink.triggering_frequency is not None and + self.sink.triggering_frequency > 0): + widowed_pcoll = self._apply_windowing(pcoll) + else: #keep user windowing + widowed_pcoll = pcoll + if self.sink.convert_fn is not None: + widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) + if min_shards == 1: + keyed_pcoll = widowed_pcoll | core.Map(lambda x: (None, x)) + else: + keyed_pcoll = widowed_pcoll | core.ParDo( + _RoundRobinKeyFn(), count=min_shards) + init_result_window_coll = ( + keyed_pcoll + | 'Pair init' >> core.Map(lambda x: (None, x)) + | 'Pair init gbk' >> core.GroupByKey() + | 'InitializeWindowedWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink)) + + write_result_coll = ( + keyed_pcoll + | 'Group by random key' >> core.GroupByKey() + | 'WriteWindowedBundles' >> core.ParDo( + _WriteWindowedBundleDoFn(sink=self.sink, per_key=True), + AsSingleton(init_result_window_coll)) + | 'Pair' >> core.Map(lambda x: (None, x)) + | core.GroupByKey() + | 'Extract' >> core.Map(lambda x: x[1])) + pre_finalized_write_result_coll = ( + write_result_coll + | 'PreFinalize' >> core.ParDo( + _PreFinalizeWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll))) + finalized_write_result_coll = ( + pre_finalized_write_result_coll + | 'FinalizeWrite' >> core.FlatMap( + _finalize_windowed_write, + self.sink, + AsSingleton(init_result_window_coll), + AsSingleton(write_result_coll), + min_shards, + AsIter(pre_finalized_write_result_coll)).with_output_types(str)) + return finalized_write_result_coll + else: + _LOGGER.info( + "*** WriteImpl min_shards undef so it's 1, and we write per Bundle") + #unbounded PCollection needes to be written per window + if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): + if (self.sink.triggering_frequency is None or + self.sink.triggering_frequency == 0): + raise ValueError( + 'To write a GlobalWindow PCollection, triggering_frequency must' + ' be set and be greater than 0') + widowed_pcoll = ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + else: + #keep user windowing, unless triggering_frequency has been specified + if (self.sink.triggering_frequency is not None and + self.sink.triggering_frequency > 0): + widowed_pcoll = ( + pcoll #TODO GroupIntoBatches and trigger indef per freq + | core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode. + DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + else: #keep user windowing + widowed_pcoll = pcoll + init_result_window_coll = ( + widowed_pcoll + | 'Pair init' >> core.Map(lambda x: (None, x)) + | 'Pair init gbk' >> core.GroupByKey() + | 'InitializeWindowedWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink)) + if self.sink.convert_fn is not None: + widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) + write_result_coll = ( + widowed_pcoll + | 'WriteWindowedBundles' >> core.ParDo( + _WriteWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll)) + | 'Pair' >> core.Map(lambda x: (None, x)) + | core.GroupByKey() + | 'Extract' >> core.Map(lambda x: x[1])) + pre_finalized_write_result_coll = ( + write_result_coll + | 'PreFinalize' >> core.ParDo( + _PreFinalizeWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll))) + finalized_write_result_coll = ( + pre_finalized_write_result_coll + | 'FinalizeWrite' >> core.FlatMap( + _finalize_windowed_write, + self.sink, + AsSingleton(init_result_window_coll), + AsSingleton(write_result_coll), + min_shards, + AsIter(pre_finalized_write_result_coll)).with_output_types(str)) + return finalized_write_result_coll + def expand(self, pcoll): + min_shards = getattr(self.sink, 'num_shards', 0) + + if (pcoll.is_bounded): + return self._expand_bounded(pcoll, min_shards) + else: + return self._expand_unbounded(pcoll, min_shards) + + def expandOld(self, pcoll): if (pcoll.is_bounded): do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) init_result_coll = do_once | 'InitializeWrite' >> core.Map( From a0b2b7d27494c4b6a068abd6155ec8ad25778ecd Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 16 Jun 2025 20:17:06 +0000 Subject: [PATCH 6/9] refactor _expand_unbounded , default num_shards to 1 , if undef or 0 --- sdks/python/apache_beam/io/iobase.py | 378 ++++++--------------------- 1 file changed, 74 insertions(+), 304 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 932d4b0953a0..d0d89cf1d151 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1215,322 +1215,92 @@ def _expand_bounded(self, pcoll, min_shards): AsSingleton(pre_finalize_coll)).with_output_types(str)) def _apply_windowing(self, pcoll): - return ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) + """ + Applies windowing to an unbounded PCollection based on the sink's + triggering frequency. + """ + use_fixed_windows = ( + self.sink.triggering_frequency is not None and + self.sink.triggering_frequency > 0) + + if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): + if not use_fixed_windows: + raise ValueError( + 'To write a GlobalWindow unbounded PCollection, ' + 'triggering_frequency must be set and be greater than 0') + return pcoll | 'ApplyFixedWindows' >> core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0)) + + # Keep user-defined windowing unless triggering_frequency is specified. + if use_fixed_windows: + return pcoll | 'ApplyFixedWindows' >> core.WindowInto( + window.FixedWindows(self.sink.triggering_frequency), + trigger=beam.transforms.trigger.AfterWatermark(), + accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, + allowed_lateness=beam.utils.timestamp.Duration(seconds=0) + ) + + return pcoll # Keep original windowing def _expand_unbounded(self, pcoll, min_shards): """Handles the expansion logic for an unbounded PCollection.""" - if (min_shards >= 1): - #unbounded PCollection needes to be written per window - if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): - if (self.sink.triggering_frequency is None or - self.sink.triggering_frequency == 0): - raise ValueError( - 'To write a GlobalWindow unbounded PCollection, ' - 'triggering_frequency must be set and be greater than 0') - widowed_pcoll = self._apply_windowing(pcoll) - else: - #keep user windowing, unless triggering_frequency has been specified - if (self.sink.triggering_frequency is not None and - self.sink.triggering_frequency > 0): - widowed_pcoll = self._apply_windowing(pcoll) - else: #keep user windowing - widowed_pcoll = pcoll - if self.sink.convert_fn is not None: - widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) - if min_shards == 1: - keyed_pcoll = widowed_pcoll | core.Map(lambda x: (None, x)) - else: - keyed_pcoll = widowed_pcoll | core.ParDo( - _RoundRobinKeyFn(), count=min_shards) - init_result_window_coll = ( - keyed_pcoll - | 'Pair init' >> core.Map(lambda x: (None, x)) - | 'Pair init gbk' >> core.GroupByKey() - | 'InitializeWindowedWrite' >> core.Map( - lambda _, sink: sink.initialize_write(), self.sink)) + windowed_pcoll = self._apply_windowing(pcoll) + + if self.sink.convert_fn is not None: + windowed_pcoll = windowed_pcoll | core.ParDo(self.sink.convert_fn) + + init_result_window_coll = ( + windowed_pcoll + | 'PairInit' >> core.Map(lambda x: (None, x)) + | 'GroupInit' >> core.GroupByKey() + | 'InitializeWindowedWrite' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink)) + + if min_shards > 1: + keyed_pcoll = windowed_pcoll | core.ParDo( + _RoundRobinKeyFn(), count=min_shards) + grouped_pcoll = keyed_pcoll | 'Group by random key' >> core.GroupByKey() + else: # min_shards is 1 + _LOGGER.info("Writing unbounded data with a single shard per window.") + keyed_pcoll = windowed_pcoll | core.Map(lambda x: (None, x)) + grouped_pcoll = keyed_pcoll | 'Group by window' >> core.GroupByKey() + + write_result_coll = ( + grouped_pcoll + | 'WriteWindowedBundles' >> core.ParDo( + _WriteWindowedBundleDoFn(sink=self.sink, per_key=True), + AsSingleton(init_result_window_coll)) + | 'PairFinal' >> core.Map(lambda x: (None, x)) + | 'GroupFinal' >> core.GroupByKey() + | 'ExtractFinal' >> core.Map(lambda x: x[1])) + + pre_finalized_write_result_coll = ( + write_result_coll + | 'PreFinalize' >> core.ParDo( + _PreFinalizeWindowedBundleDoFn(self.sink), + AsSingleton(init_result_window_coll))) - write_result_coll = ( - keyed_pcoll - | 'Group by random key' >> core.GroupByKey() - | 'WriteWindowedBundles' >> core.ParDo( - _WriteWindowedBundleDoFn(sink=self.sink, per_key=True), - AsSingleton(init_result_window_coll)) - | 'Pair' >> core.Map(lambda x: (None, x)) - | core.GroupByKey() - | 'Extract' >> core.Map(lambda x: x[1])) - pre_finalized_write_result_coll = ( - write_result_coll - | 'PreFinalize' >> core.ParDo( - _PreFinalizeWindowedBundleDoFn(self.sink), - AsSingleton(init_result_window_coll))) - finalized_write_result_coll = ( - pre_finalized_write_result_coll - | 'FinalizeWrite' >> core.FlatMap( - _finalize_windowed_write, - self.sink, - AsSingleton(init_result_window_coll), - AsSingleton(write_result_coll), - min_shards, - AsIter(pre_finalized_write_result_coll)).with_output_types(str)) - return finalized_write_result_coll - else: - _LOGGER.info( - "*** WriteImpl min_shards undef so it's 1, and we write per Bundle") - #unbounded PCollection needes to be written per window - if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): - if (self.sink.triggering_frequency is None or - self.sink.triggering_frequency == 0): - raise ValueError( - 'To write a GlobalWindow PCollection, triggering_frequency must' - ' be set and be greater than 0') - widowed_pcoll = ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) - else: - #keep user windowing, unless triggering_frequency has been specified - if (self.sink.triggering_frequency is not None and - self.sink.triggering_frequency > 0): - widowed_pcoll = ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) - else: #keep user windowing - widowed_pcoll = pcoll - init_result_window_coll = ( - widowed_pcoll - | 'Pair init' >> core.Map(lambda x: (None, x)) - | 'Pair init gbk' >> core.GroupByKey() - | 'InitializeWindowedWrite' >> core.Map( - lambda _, sink: sink.initialize_write(), self.sink)) - if self.sink.convert_fn is not None: - widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) - write_result_coll = ( - widowed_pcoll - | 'WriteWindowedBundles' >> core.ParDo( - _WriteWindowedBundleDoFn(self.sink), - AsSingleton(init_result_window_coll)) - | 'Pair' >> core.Map(lambda x: (None, x)) - | core.GroupByKey() - | 'Extract' >> core.Map(lambda x: x[1])) - pre_finalized_write_result_coll = ( - write_result_coll - | 'PreFinalize' >> core.ParDo( - _PreFinalizeWindowedBundleDoFn(self.sink), - AsSingleton(init_result_window_coll))) - finalized_write_result_coll = ( - pre_finalized_write_result_coll - | 'FinalizeWrite' >> core.FlatMap( - _finalize_windowed_write, - self.sink, - AsSingleton(init_result_window_coll), - AsSingleton(write_result_coll), - min_shards, - AsIter(pre_finalized_write_result_coll)).with_output_types(str)) - return finalized_write_result_coll + return ( + pre_finalized_write_result_coll + | 'FinalizeWrite' >> core.FlatMap( + _finalize_windowed_write, + self.sink, + AsSingleton(init_result_window_coll), + AsSingleton(write_result_coll), + min_shards, + AsIter(pre_finalized_write_result_coll)).with_output_types(str)) def expand(self, pcoll): - min_shards = getattr(self.sink, 'num_shards', 0) + min_shards = getattr(self.sink, 'num_shards', 1) if (pcoll.is_bounded): return self._expand_bounded(pcoll, min_shards) else: return self._expand_unbounded(pcoll, min_shards) - def expandOld(self, pcoll): - if (pcoll.is_bounded): - do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) - init_result_coll = do_once | 'InitializeWrite' >> core.Map( - lambda _, sink: sink.initialize_write(), self.sink) - if getattr(self.sink, 'num_shards', 0): - min_shards = self.sink.num_shards - - if (pcoll.is_bounded): - if min_shards == 1: - keyed_pcoll = pcoll | core.Map(lambda x: (None, x)) - else: - keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(), count=min_shards) - write_result_coll = ( - keyed_pcoll - | core.WindowInto(window.GlobalWindows()) - | core.GroupByKey() - | 'WriteBundles' >> core.ParDo( - _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll)) - ) - else: #unbounded PCollection needes to be written per window - if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): - if (self.sink.triggering_frequency is None or - self.sink.triggering_frequency == 0): - raise ValueError( - 'To write a GlobalWindow unbounded PCollection, ' - 'triggering_frequency must be set and be greater than 0') - widowed_pcoll = ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) - else: - #keep user windowing, unless triggering_frequency has been specified - if (self.sink.triggering_frequency is not None and - self.sink.triggering_frequency > 0): - widowed_pcoll = ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) - else: #keep user windowing - widowed_pcoll = pcoll - if self.sink.convert_fn is not None: - widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) - if min_shards == 1: - keyed_pcoll = widowed_pcoll | core.Map(lambda x: (None, x)) - else: - keyed_pcoll = widowed_pcoll | core.ParDo( - _RoundRobinKeyFn(), count=min_shards) - init_result_window_coll = ( - keyed_pcoll - | 'Pair init' >> core.Map(lambda x: (None, x)) - | 'Pair init gbk' >> core.GroupByKey() - | 'InitializeWindowedWrite' >> core.Map( - lambda _, sink: sink.initialize_write(), self.sink)) - - write_result_coll = ( - keyed_pcoll - | 'Group by random key' >> core.GroupByKey() - | 'WriteWindowedBundles' >> core.ParDo( - _WriteWindowedBundleDoFn(sink=self.sink, per_key=True), - AsSingleton(init_result_window_coll)) - | 'Pair' >> core.Map(lambda x: (None, x)) - | core.GroupByKey() - | 'Extract' >> core.Map(lambda x: x[1])) - pre_finalized_write_result_coll = ( - write_result_coll - | 'PreFinalize' >> core.ParDo( - _PreFinalizeWindowedBundleDoFn(self.sink), - AsSingleton(init_result_window_coll))) - finalized_write_result_coll = ( - pre_finalized_write_result_coll - | 'FinalizeWrite' >> core.FlatMap( - _finalize_windowed_write, - self.sink, - AsSingleton(init_result_window_coll), - AsSingleton(write_result_coll), - min_shards, - AsIter(pre_finalized_write_result_coll)).with_output_types(str)) - return finalized_write_result_coll - else: - _LOGGER.info( - "*** WriteImpl min_shards undef so it's 1, and we write per Bundle") - min_shards = 1 - if (pcoll.is_bounded): - write_result_coll = ( - pcoll - | core.WindowInto(window.GlobalWindows()) - | 'WriteBundles' >> core.ParDo( - _WriteBundleDoFn(self.sink), AsSingleton(init_result_coll)) - | 'Pair' >> core.Map(lambda x: (None, x)) - | core.GroupByKey() - | 'Extract' >> core.FlatMap(lambda x: x[1])) - else: #unbounded PCollection needes to be written per window - if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): - if (self.sink.triggering_frequency is None or - self.sink.triggering_frequency == 0): - raise ValueError( - 'To write a GlobalWindow PCollection, triggering_frequency must' - ' be set and be greater than 0') - widowed_pcoll = ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) - else: - #keep user windowing, unless triggering_frequency has been specified - if (self.sink.triggering_frequency is not None and - self.sink.triggering_frequency > 0): - widowed_pcoll = ( - pcoll #TODO GroupIntoBatches and trigger indef per freq - | core.WindowInto( - window.FixedWindows(self.sink.triggering_frequency), - trigger=beam.transforms.trigger.AfterWatermark(), - accumulation_mode=beam.transforms.trigger.AccumulationMode. - DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0))) - else: #keep user windowing - widowed_pcoll = pcoll - init_result_window_coll = ( - widowed_pcoll - | 'Pair init' >> core.Map(lambda x: (None, x)) - | 'Pair init gbk' >> core.GroupByKey() - | 'InitializeWindowedWrite' >> core.Map( - lambda _, sink: sink.initialize_write(), self.sink)) - if self.sink.convert_fn is not None: - widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn) - write_result_coll = ( - widowed_pcoll - | 'WriteWindowedBundles' >> core.ParDo( - _WriteWindowedBundleDoFn(self.sink), - AsSingleton(init_result_window_coll)) - | 'Pair' >> core.Map(lambda x: (None, x)) - | core.GroupByKey() - | 'Extract' >> core.Map(lambda x: x[1])) - pre_finalized_write_result_coll = ( - write_result_coll - | 'PreFinalize' >> core.ParDo( - _PreFinalizeWindowedBundleDoFn(self.sink), - AsSingleton(init_result_window_coll))) - finalized_write_result_coll = ( - pre_finalized_write_result_coll - | 'FinalizeWrite' >> core.FlatMap( - _finalize_windowed_write, - self.sink, - AsSingleton(init_result_window_coll), - AsSingleton(write_result_coll), - min_shards, - AsIter(pre_finalized_write_result_coll)).with_output_types(str)) - return finalized_write_result_coll - # PreFinalize should run before FinalizeWrite, and the two should not be - # fused. - if (pcoll.is_bounded): - pre_finalize_coll = ( - do_once - | 'PreFinalize' >> core.FlatMap( - _pre_finalize, - self.sink, - AsSingleton(init_result_coll), - AsIter(write_result_coll))) - return ( - do_once | 'FinalizeWrite' >> core.FlatMap( - _finalize_write, - self.sink, - AsSingleton(init_result_coll), - AsIter(write_result_coll), - min_shards, - AsSingleton(pre_finalize_coll)).with_output_types(str)) - class _WriteBundleDoFn(core.DoFn): """A DoFn for writing elements to an iobase.Writer. From 1a54a824316e1c6b63afe3c9b62851edd8bcd314 Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 16 Jun 2025 20:33:59 +0000 Subject: [PATCH 7/9] fix formatter --- sdks/python/apache_beam/io/iobase.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index d0d89cf1d151..828e9e05db66 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1240,8 +1240,7 @@ def _apply_windowing(self, pcoll): window.FixedWindows(self.sink.triggering_frequency), trigger=beam.transforms.trigger.AfterWatermark(), accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, - allowed_lateness=beam.utils.timestamp.Duration(seconds=0) - ) + allowed_lateness=beam.utils.timestamp.Duration(seconds=0)) return pcoll # Keep original windowing From 4befbe194da08dd98cdd242d80fb7c06d8c549be Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 16 Jun 2025 20:48:23 +0000 Subject: [PATCH 8/9] space --- sdks/python/apache_beam/io/iobase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 828e9e05db66..24f82257ce0c 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1242,7 +1242,7 @@ def _apply_windowing(self, pcoll): accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, allowed_lateness=beam.utils.timestamp.Duration(seconds=0)) - return pcoll # Keep original windowing + return pcoll # Keep original windowing def _expand_unbounded(self, pcoll, min_shards): """Handles the expansion logic for an unbounded PCollection.""" From 2edceebcd3515b457dded00f763b93a70d5fb30a Mon Sep 17 00:00:00 2001 From: Razvan Culea Date: Mon, 16 Jun 2025 20:52:00 +0000 Subject: [PATCH 9/9] keep num_shards = 0 the same as before for bounded write --- sdks/python/apache_beam/io/iobase.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 24f82257ce0c..67d6cd358a07 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1263,6 +1263,7 @@ def _expand_unbounded(self, pcoll, min_shards): _RoundRobinKeyFn(), count=min_shards) grouped_pcoll = keyed_pcoll | 'Group by random key' >> core.GroupByKey() else: # min_shards is 1 + min_shards = 1 _LOGGER.info("Writing unbounded data with a single shard per window.") keyed_pcoll = windowed_pcoll | core.Map(lambda x: (None, x)) grouped_pcoll = keyed_pcoll | 'Group by window' >> core.GroupByKey() @@ -1293,7 +1294,7 @@ def _expand_unbounded(self, pcoll, min_shards): AsIter(pre_finalized_write_result_coll)).with_output_types(str)) def expand(self, pcoll): - min_shards = getattr(self.sink, 'num_shards', 1) + min_shards = getattr(self.sink, 'num_shards', 0) if (pcoll.is_bounded): return self._expand_bounded(pcoll, min_shards)