diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala index 9c1dde4c6e8..76bde981f82 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala @@ -28,7 +28,7 @@ import org.openjdk.jmh.annotations._ import org.apache.pekko import pekko.actor.ActorSystem -import pekko.stream.scaladsl._ +import pekko.stream.scaladsl.{ GatherCollector, Gatherer, Keep, OneToOneGatherer, Sink, Source } import com.typesafe.config.ConfigFactory @@ -76,6 +76,103 @@ class ZipWithIndexBenchmark { } .toMat(Sink.ignore)(Keep.right) + private val statefulMapZipWithIndex = Source + .repeat(1) + .take(OperationsPerInvocation) + .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None) + .toMat(Sink.ignore)(Keep.right) + + private val gatherPublicZipWithIndex = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new Gatherer[Int, (Int, Long)] { + private var index = 0L + + override def apply(elem: Int, collector: GatherCollector[(Int, Long)]): Unit = { + val zipped = (elem, index) + index += 1 + collector.push(zipped) + } + }) + .toMat(Sink.ignore)(Keep.right) + + private val gatherInternalOneToOneZipWithIndex = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new OneToOneGatherer[Int, (Int, Long)] { + private var index = 0L + + override def applyOne(elem: Int): (Int, Long) = { + val zipped = (elem, index) + index += 1 + zipped + } + }) + .toMat(Sink.ignore)(Keep.right) + + private val statefulMapIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .statefulMap(() => ())((state, elem) => (state, elem + 1), _ => None) + .toMat(Sink.ignore)(Keep.right) + + private val gatherPublicIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem + 1) + }) + .toMat(Sink.ignore)(Keep.right) + + private val gatherInternalOneToOneIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new OneToOneGatherer[Int, Int] { + override def applyOne(elem: Int): Int = elem + 1 + }) + .toMat(Sink.ignore)(Keep.right) + + private val statefulMapCountedIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .statefulMap(() => 0L)((index, elem) => (index + 1, elem + index.toInt), _ => None) + .toMat(Sink.ignore)(Keep.right) + + private val gatherPublicCountedIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new Gatherer[Int, Int] { + private var index = 0L + + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + val incremented = elem + index.toInt + index += 1 + collector.push(incremented) + } + }) + .toMat(Sink.ignore)(Keep.right) + + private val gatherInternalOneToOneCountedIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new OneToOneGatherer[Int, Int] { + private var index = 0L + + override def applyOne(elem: Int): Int = { + val incremented = elem + index.toInt + index += 1 + incremented + } + }) + .toMat(Sink.ignore)(Keep.right) + @Benchmark @OperationsPerInvocation(OperationsPerInvocation) def benchOldZipWithIndex(): Unit = @@ -86,4 +183,49 @@ class ZipWithIndexBenchmark { def benchNewZipWithIndex(): Unit = Await.result(newZipWithIndex.run(), Duration.Inf) + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchStatefulMapZipWithIndex(): Unit = + Await.result(statefulMapZipWithIndex.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherPublicZipWithIndex(): Unit = + Await.result(gatherPublicZipWithIndex.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherInternalOneToOneZipWithIndex(): Unit = + Await.result(gatherInternalOneToOneZipWithIndex.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchStatefulMapIncrement(): Unit = + Await.result(statefulMapIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherPublicIncrement(): Unit = + Await.result(gatherPublicIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherInternalOneToOneIncrement(): Unit = + Await.result(gatherInternalOneToOneIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchStatefulMapCountedIncrement(): Unit = + Await.result(statefulMapCountedIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherPublicCountedIncrement(): Unit = + Await.result(gatherPublicCountedIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherInternalOneToOneCountedIncrement(): Unit = + Await.result(gatherInternalOneToOneCountedIncrement.run(), Duration.Inf) + } diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md new file mode 100644 index 00000000000..ec3850fef4c --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md @@ -0,0 +1,68 @@ +# gather + +Transform each input element into zero or more output elements using a stateful gatherer. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Flow.gather](Flow) { scala="#gather%5BT%5D%28create%3A%28%29%3D%3EGatherer%5BOut%2CT%5D%29%3ARepr%5BT%5D" java="#gather(org.apache.pekko.japi.function.Creator)" } + +## Description + +Transform each input element into zero or more output elements without requiring tuple or collection allocations +imposed by the operator API itself. + +A new `Gatherer` is created for each materialization and can keep mutable state in fields or closures. +The provided `GatherCollector` can emit zero or more output elements for each input element. + +The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + +The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, +upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. +Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, +and are ignored on downstream cancellation and abrupt termination. + +The `gather` operator adheres to the @ref:[ActorAttributes.SupervisionStrategy](../../actors.md) attribute. + +For a simpler stateless mapping, use @ref:[map](map.md) or @ref:[mapConcat](mapConcat.md). + +## Examples + +In the first example, we implement a `zipWithIndex` operator like @ref:[zipWithIndex](zipWithIndex.md): + +Scala +: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #zipWithIndex } + +Java +: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #zipWithIndex } + +In the second example, elements are buffered until a different element arrives, then emitted: + +Scala +: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #bufferUntilChanged } + +Java +: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #bufferUntilChanged } + +In the third example, repeated incoming elements are only emitted once: + +Scala +: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #distinctUntilChanged } + +Java +: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #distinctUntilChanged } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the gatherer emits an element and downstream is ready to consume it + +**backpressures** when downstream backpressures + +**completes** upstream completes and the gatherer has emitted all pending elements, including `onComplete` + +**cancels** downstream cancels + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index f39c251bb8f..268ba92bfc7 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -165,6 +165,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.| |Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.| +|Source/Flow|@ref[gather](Source-or-Flow/gather.md)|Transform each input element into zero or more output elements with a stateful gatherer.| |Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.| |Source/Flow|@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.| |Source/Flow|@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.| @@ -498,6 +499,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [futureFlow](Flow/futureFlow.md) * [futureSink](Sink/futureSink.md) * [futureSource](Source/futureSource.md) +* [gather](Source-or-Flow/gather.md) * [groupBy](Source-or-Flow/groupBy.md) * [grouped](Source-or-Flow/grouped.md) * [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md) diff --git a/docs/src/test/java/jdocs/stream/operators/flow/Gather.java b/docs/src/test/java/jdocs/stream/operators/flow/Gather.java new file mode 100644 index 00000000000..a49ccefb3c2 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/flow/Gather.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.flow; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.GatherCollector; +import org.apache.pekko.stream.javadsl.Gatherer; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + +public class Gather { + + static final ActorSystem system = null; + + static void zipWithIndex() { + // #zipWithIndex + Source.from(Arrays.asList("A", "B", "C", "D")) + .gather( + () -> + new Gatherer() { + private long index = 0L; + + @Override + public void apply(String elem, GatherCollector collector) { + collector.push("(" + elem + "," + index + ")"); + index += 1; + } + }) + .runWith(Sink.foreach(System.out::println), system); + // prints + // (A,0) + // (B,1) + // (C,2) + // (D,3) + // #zipWithIndex + } + + static void bufferUntilChanged() { + // #bufferUntilChanged + Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D")) + .gather( + () -> + new Gatherer>() { + private final List buffer = new ArrayList<>(); + + @Override + public void apply(String elem, GatherCollector> collector) { + if (!buffer.isEmpty() && !buffer.get(0).equals(elem)) { + collector.push(new ArrayList<>(buffer)); + buffer.clear(); + } + buffer.add(elem); + } + + @Override + public void onComplete(GatherCollector> collector) { + if (!buffer.isEmpty()) { + collector.push(new ArrayList<>(buffer)); + } + } + }) + .runWith(Sink.foreach(System.out::println), system); + // prints + // [A] + // [B, B] + // [C, C, C] + // [D] + // #bufferUntilChanged + } + + static void distinctUntilChanged() { + // #distinctUntilChanged + Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D")) + .gather( + () -> + new Gatherer() { + private String lastElement = null; + + @Override + public void apply(String elem, GatherCollector collector) { + if (!elem.equals(lastElement)) { + lastElement = elem; + collector.push(elem); + } + } + }) + .runWith(Sink.foreach(System.out::println), system); + // prints + // A + // B + // C + // D + // #distinctUntilChanged + } +} diff --git a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala new file mode 100644 index 00000000000..f8f7beadc2c --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.flow + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.{ GatherCollector, Gatherer, Source } + +object Gather { + + implicit val actorSystem: ActorSystem = ??? + + def zipWithIndex(): Unit = { + // #zipWithIndex + Source(List("A", "B", "C", "D")) + .gather(() => + new Gatherer[String, (String, Long)] { + private var index = 0L + + override def apply(elem: String, collector: GatherCollector[(String, Long)]): Unit = { + collector.push((elem, index)) + index += 1 + } + }) + .runForeach(println) + // prints + // (A,0) + // (B,1) + // (C,2) + // (D,3) + // #zipWithIndex + } + + def bufferUntilChanged(): Unit = { + // #bufferUntilChanged + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .gather(() => + new Gatherer[String, List[String]] { + private var buffer = List.empty[String] + + override def apply(elem: String, collector: GatherCollector[List[String]]): Unit = + buffer match { + case head :: _ if head != elem => + collector.push(buffer.reverse) + buffer = elem :: Nil + case _ => + buffer = elem :: buffer + } + + override def onComplete(collector: GatherCollector[List[String]]): Unit = + if (buffer.nonEmpty) + collector.push(buffer.reverse) + }) + .runForeach(println) + // prints + // List(A) + // List(B, B) + // List(C, C, C) + // List(D) + // #bufferUntilChanged + } + + def distinctUntilChanged(): Unit = { + // #distinctUntilChanged + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .gather(() => + new Gatherer[String, String] { + private var lastElement: Option[String] = None + + override def apply(elem: String, collector: GatherCollector[String]): Unit = + lastElement match { + case Some(last) if last == elem => + case _ => + lastElement = Some(elem) + collector.push(elem) + } + }) + .runForeach(println) + // prints + // A + // B + // C + // D + // #distinctUntilChanged + } +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f3058445066..90be09062a0 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -328,6 +328,64 @@ public void mustBeAbleToUseMapWithAutoCloseableResource() throws Exception { Assertions.assertEquals(1, closed.get()); } + @Test + public void mustBeAbleToUseGather() throws Exception { + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); + final CompletionStage grouped = + Source.from(input) + .via( + Flow.of(Integer.class) + .gather( + () -> + new Gatherer() { + private final ArrayList buffer = new ArrayList<>(2); + + @Override + public void apply(Integer elem, GatherCollector collector) { + buffer.add(elem); + if (buffer.size() == 2) { + collector.push(buffer.toString()); + buffer.clear(); + } + } + + @Override + public void onComplete(GatherCollector collector) { + if (!buffer.isEmpty()) { + collector.push(buffer.toString()); + } + } + })) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals( + "[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustBeAbleToUseGatherAsDistinctUntilChanged() throws Exception { + final CompletionStage result = + Source.from(Arrays.asList("A", "B", "B", "C", "C", "D")) + .via( + Flow.of(String.class) + .gather( + () -> + new Gatherer() { + private String lastSeen = null; + + @Override + public void apply(String elem, GatherCollector collector) { + if (!elem.equals(lastSeen)) { + collector.push(elem); + lastSeen = elem; + } + } + })) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals("ABCD", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseFoldWhile() throws Exception { final int result = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 3aec7545b09..4b6d5cc986d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -936,6 +936,46 @@ public void mustBeAbleToUseMapWithResource() { Assertions.assertFalse(gate.get()); } + @Test + public void mustBeAbleToUseGather() throws Exception { + final CompletionStage result = + Source.from(Arrays.asList(1, 2, 3, 4, 5)) + .gather( + () -> + new Gatherer() { + private int sum = 0; + + @Override + public void apply(Integer elem, GatherCollector collector) { + sum += elem; + collector.push(sum); + } + }) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals("1361015", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustBeAbleToUseGatherAsZipWithIndex() throws Exception { + final CompletionStage result = + Source.from(Arrays.asList("A", "B", "C", "D")) + .gather( + () -> + new Gatherer() { + private long index = 0L; + + @Override + public void apply(String elem, GatherCollector collector) { + collector.push(elem + ":" + index); + index += 1; + } + }) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals("A:0B:1C:2D:3", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseMapWithAutoCloseableResource() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala new file mode 100644 index 00000000000..46c7f060e97 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala @@ -0,0 +1,849 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import scala.annotation.nowarn +import scala.concurrent.{ Await, Promise } +import scala.concurrent.duration.DurationInt +import scala.util.Success +import scala.util.control.NoStackTrace + +import org.apache.pekko.Done +import org.apache.pekko.stream.{ + AbruptStageTerminationException, + ActorAttributes, + ActorMaterializer, + ClosedShape, + Supervision +} +import org.apache.pekko.stream.testkit.{ StreamSpec, TestSubscriber } +import org.apache.pekko.stream.testkit.Utils.TE +import org.apache.pekko.stream.testkit.scaladsl.{ TestSink, TestSource } +import org.apache.pekko.stream.scaladsl.{ Flow, Keep } +import org.apache.pekko.testkit.EventFilter + +class FlowGatherSpec extends StreamSpec { + + private val ex = new Exception("TEST") with NoStackTrace + + object BeenCalledTimesGate { + def apply(): BeenCalledTimesGate = new BeenCalledTimesGate(1) + def apply(nTimes: Int): BeenCalledTimesGate = new BeenCalledTimesGate(nTimes) + } + + class BeenCalledTimesGate(nTimes: Int) { + private val beenCalled = new AtomicInteger(0) + + def mark(): Unit = beenCalled.updateAndGet { current => + if (current == nTimes) + throw new IllegalStateException(s"Has been called:[$nTimes] times, should not be called anymore.") + else current + 1 + } + + def ensure(): Unit = + if (beenCalled.get() != nTimes) + throw new IllegalStateException(s"Expected to be called:[$nTimes], but only be called:[$beenCalled]") + } + + "A Gather" must { + "work in the happy case" in { + val gate = BeenCalledTimesGate() + Source(List(1, 2, 3, 4, 5)) + .gather(() => + new Gatherer[Int, (Int, Int)] { + private var agg = 0 + + override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = { + collector.push((agg, elem)) + agg += elem + } + + override def onComplete(collector: GatherCollector[(Int, Int)]): Unit = + gate.mark() + }) + .runWith(TestSink[(Int, Int)]()) + .request(6) + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectNext((3, 3)) + .expectNext((6, 4)) + .expectNext((10, 5)) + .expectComplete() + gate.ensure() + } + + "remember state when complete" in { + val gate = BeenCalledTimesGate() + Source(1 to 10) + .gather(() => + new Gatherer[Int, List[Int]] { + private var state = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = { + val newState = elem :: state + if (newState.size == 3) { + state = Nil + collector.push(newState.reverse) + } else + state = newState + } + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = { + gate.mark() + collector.push(state.reverse) + } + }) + .mapConcat(identity) + .runWith(TestSink[Int]()) + .request(10) + .expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .expectComplete() + gate.ensure() + } + + "emit zero or more elements and drain on completion" in { + Source(1 to 5) + .gather(() => + new Gatherer[Int, List[Int]] { + private var buffer = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = { + buffer = elem :: buffer + if (buffer.size == 2) { + collector.push(buffer.reverse) + buffer = Nil + } + } + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = + if (buffer.nonEmpty) + collector.push(buffer.reverse) + }) + .runWith(TestSink[List[Int]]()) + .request(3) + .expectNext(List(1, 2)) + .expectNext(List(3, 4)) + .expectNext(List(5)) + .expectComplete() + } + + "emit all outputs when a callback deopts from single to multi mode" in { + Source.single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + collector.push(elem + 1) + } + }) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1, 2) + .expectComplete() + } + + "drop a single buffered output if apply throws after pushing it" in { + Source.single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + throw TE("boom") + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(TE("boom")) + } + + "resume without leaking a single buffered output if apply throws after pushing it" in { + Source(List(1, 2)) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + if (elem == 1) + throw ex + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(2) + .expectComplete() + } + + "be usable as zipWithIndex" in { + val gate = BeenCalledTimesGate() + Source(List("A", "B", "C", "D")) + .gather(() => + new Gatherer[String, (String, Long)] { + private var index = 0L + + override def apply(elem: String, collector: GatherCollector[(String, Long)]): Unit = { + collector.push((elem, index)) + index += 1 + } + + override def onComplete(collector: GatherCollector[(String, Long)]): Unit = + gate.mark() + }) + .runWith(TestSink[(String, Long)]()) + .request(4) + .expectNext(("A", 0L)) + .expectNext(("B", 1L)) + .expectNext(("C", 2L)) + .expectNext(("D", 3L)) + .expectComplete() + gate.ensure() + } + + "respect backpressure for public single-output gatherers" in { + Source(List("A", "B", "C")) + .gather(() => + new Gatherer[String, String] { + override def apply(elem: String, collector: GatherCollector[String]): Unit = + collector.push(elem) + }) + .runWith(TestSink[String]()) + .request(1) + .expectNext("A") + .expectNoMessage(200.millis) + .request(1) + .expectNext("B") + .request(1) + .expectNext("C") + .expectComplete() + } + + "respect backpressure for one-to-one gatherers" in { + Source(List("A", "B", "C")) + .gather(() => + new OneToOneGatherer[String, String] { + override def applyOne(elem: String): String = elem + }) + .runWith(TestSink[String]()) + .request(1) + .expectNext("A") + .expectNoMessage(200.millis) + .request(1) + .expectNext("B") + .request(1) + .expectNext("C") + .expectComplete() + } + + "be usable as bufferUntilChanged" in { + val gate = BeenCalledTimesGate() + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .gather(() => + new Gatherer[String, List[String]] { + private var buffer = List.empty[String] + + override def apply(elem: String, collector: GatherCollector[List[String]]): Unit = + buffer match { + case head :: _ if head != elem => + collector.push(buffer.reverse) + buffer = elem :: Nil + case _ => + buffer = elem :: buffer + } + + override def onComplete(collector: GatherCollector[List[String]]): Unit = { + gate.mark() + if (buffer.nonEmpty) + collector.push(buffer.reverse) + } + }) + .runWith(TestSink[List[String]]()) + .request(4) + .expectNext(List("A")) + .expectNext(List("B", "B")) + .expectNext(List("C", "C", "C")) + .expectNext(List("D")) + .expectComplete() + gate.ensure() + } + + "be usable as distinctUntilChanged" in { + val gate = BeenCalledTimesGate() + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .gather(() => + new Gatherer[String, String] { + private var lastElement: Option[String] = None + + override def apply(elem: String, collector: GatherCollector[String]): Unit = + lastElement match { + case Some(last) if last == elem => + case _ => + lastElement = Some(elem) + collector.push(elem) + } + + override def onComplete(collector: GatherCollector[String]): Unit = + gate.mark() + }) + .runWith(TestSink[String]()) + .request(4) + .expectNext("A") + .expectNext("B") + .expectNext("C") + .expectNext("D") + .expectComplete() + gate.ensure() + } + + "resume when supervision says Resume" in { + Source(List(1, 2, 3, 4, 5)) + .gather(() => + new Gatherer[Int, Int] { + private var sum = 0 + + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + if (elem % 2 == 0) + throw ex + else { + sum += elem + collector.push(sum) + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(3) + .expectNext(1, 4, 9) + .expectComplete() + } + + "emit onComplete elements before restarting" in { + val generation = new AtomicInteger(0) + val (source, sink) = TestSource[String]() + .viaMat(Flow[String].gather(() => { + val currentGeneration = generation.incrementAndGet() + new Gatherer[String, String] { + override def apply(elem: String, collector: GatherCollector[String]): Unit = + if (elem == "boom") throw TE("boom") + else collector.push(s"$elem$currentGeneration") + + override def onComplete(collector: GatherCollector[String]): Unit = + collector.push(s"onClose$currentGeneration") + } + }))(Keep.left) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .toMat(TestSink())(Keep.both) + .run() + + sink.request(1) + source.sendNext("one") + sink.expectNext("one1") + sink.request(1) + source.sendNext("boom") + sink.expectNext("onClose1") + sink.request(1) + source.sendNext("two") + sink.expectNext("two2") + sink.cancel() + source.expectCancellation() + } + + "restart and recreate gatherer state when supervision says Restart" in { + val generation = new AtomicInteger(0) + Source(List(1, 2, 3, 4, 5)) + .gather(() => { + generation.incrementAndGet() + new Gatherer[Int, (Int, Int)] { + private var agg = 0 + + override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = + if (elem % 3 == 0) + throw ex + else { + collector.push((agg, elem)) + agg += elem + } + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[(Int, Int)]()) + .request(5) + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectNext((0, 4)) + .expectNext((4, 5)) + .expectComplete() + generation.get() shouldBe 2 + } + + "stop when supervision says Stop" in { + val gate = BeenCalledTimesGate() + Source(List(1, 2, 3, 4, 5)) + .gather(() => + new Gatherer[Int, (Int, Int)] { + private var agg = 0 + + override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = + if (elem % 3 == 0) + throw ex + else { + collector.push((agg, elem)) + agg += elem + } + + override def onComplete(collector: GatherCollector[(Int, Int)]): Unit = + gate.mark() + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[(Int, Int)]()) + .request(5) + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectError(ex) + gate.ensure() + } + + "fail on upstream failure when onComplete emits nothing" in { + val gate = BeenCalledTimesGate() + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = + gate.mark() + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(3) + source.sendNext(1) + sink.expectNext(1) + source.sendNext(2) + sink.expectNext(2) + source.sendError(ex) + sink.expectError(ex) + gate.ensure() + } + + "defer upstream failure until onComplete elements are emitted" in { + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + private var sum = 0 + + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + sum += elem + collector.push(sum) + } + + override def onComplete(collector: GatherCollector[Int]): Unit = + collector.push(-1) + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(3) + source.sendNext(1) + sink.expectNext(1) + source.sendNext(2) + sink.expectNext(3) + source.sendError(ex) + sink.expectNext(-1) + sink.expectError(ex) + } + + "emit buffered elements before failing when supervision stops the stage" in { + Source(List(1, 2, 3)) + .gather(() => + new Gatherer[Int, List[Int]] { + private var buffer = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = + if (elem == 3) + throw ex + else + buffer = elem :: buffer + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = + if (buffer.nonEmpty) + collector.push(buffer.reverse) + }) + .runWith(TestSink[List[Int]]()) + .request(2) + .expectNext(List(1, 2)) + .expectError(ex) + } + + "call onComplete when supervision stops the stage" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + val done = Source + .single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + throw ex + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.ignore) + + done.failed.futureValue shouldBe ex + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "cancel upstream when downstream cancels" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + val source = TestSource[Int]() + .via(Flow[Int].gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + })) + .toMat(Sink.cancelled)(Keep.left) + .run() + + source.expectCancellation() + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "cancel upstream when downstream fails" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + val testProbe = TestSubscriber.probe[Int]() + val source = TestSource[Int]() + .via(Flow[Int].gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + })) + .toMat(Sink.fromSubscriber(testProbe))(Keep.left) + .run() + + testProbe.cancel(ex) + source.expectCancellationWithCause(ex) + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "invoke onComplete exactly once when downstream cancels while draining final elements" in { + val closedCounter = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + closedCounter.incrementAndGet() + collector.push(100) + collector.push(200) + } + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(2) + source.sendNext(1) + sink.expectNext(1) + source.sendComplete() + sink.expectNext(100) + sink.cancel() + closedCounter.get() shouldBe 1 + } + + "not restart gatherer when downstream cancels while draining restart elements" in { + val generation = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .gather(() => { + val currentGeneration = generation.incrementAndGet() + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + throw TE(s"boom-$currentGeneration") + + override def onComplete(collector: GatherCollector[Int]): Unit = { + collector.push(100 + currentGeneration) + collector.push(200 + currentGeneration) + } + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(1) + source.sendNext(1) + sink.expectNext(101) + sink.cancel() + generation.get() shouldBe 1 + } + + "fail when emitting null" in { + Source.single(1) + .gather(() => + new Gatherer[Int, String] { + override def apply(elem: Int, collector: GatherCollector[String]): Unit = + collector.push(null.asInstanceOf[String]) + }) + .runWith(TestSink[String]()) + .request(1) + .expectError() shouldBe a[NullPointerException] + } + + "call onComplete on abrupt materializer termination" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + @nowarn("msg=deprecated") + val mat = ActorMaterializer() + + val matVal = Source + .single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + }) + .runWith(Sink.never)(mat) + + mat.shutdown() + matVal.failed.futureValue shouldBe a[AbruptStageTerminationException] + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "will not call onComplete twice if apply fails" in { + val closedCounter = new AtomicInteger(0) + val probe = Source + .repeat(1) + .gather(() => + new Gatherer[Int, String] { + override def apply(elem: Int, collector: GatherCollector[String]): Unit = + throw TE("failing read") + + override def onComplete(collector: GatherCollector[String]): Unit = + closedCounter.incrementAndGet() + }) + .runWith(TestSink[String]()) + + probe.request(1) + probe.expectError(TE("failing read")) + closedCounter.get() shouldBe 1 + } + + "will not call onComplete twice if both apply and onComplete fail" in { + val closedCounter = new AtomicInteger(0) + val probe = Source + .repeat(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + throw TE("failing read") + + override def onComplete(collector: GatherCollector[Int]): Unit = + if (closedCounter.incrementAndGet() == 1) + throw TE("boom") + }) + .runWith(TestSink[Int]()) + + EventFilter[TE](occurrences = 1).intercept { + probe.request(1) + probe.expectError(TE("boom")) + } + closedCounter.get() shouldBe 1 + } + + "will not call onComplete twice on cancel when onComplete fails" in { + val closedCounter = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .viaMat(Flow[Int].gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + closedCounter.incrementAndGet() + throw TE("boom") + } + }))(Keep.left) + .toMat(TestSink[Int]())(Keep.both) + .run() + + EventFilter[TE](occurrences = 1).intercept { + sink.request(1) + source.sendNext(1) + sink.expectNext(1) + sink.cancel() + source.expectCancellation() + } + closedCounter.get() shouldBe 1 + } + + "will not call onComplete twice if onComplete fails on upstream complete" in { + val closedCounter = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + closedCounter.incrementAndGet() + throw TE("boom") + } + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + EventFilter[TE](occurrences = 1).intercept { + sink.request(1) + source.sendNext(1) + sink.expectNext(1) + sink.request(1) + source.sendComplete() + sink.expectError(TE("boom")) + } + closedCounter.get() shouldBe 1 + } + } + + "create independent gatherer instances per materialization" in { + val stateCounter = new AtomicInteger(0) + val flow = Flow[Int] + .gather(() => { + stateCounter.incrementAndGet() + new Gatherer[Int, Int] { + private var acc = 0 + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + acc += elem + collector.push(acc) + } + } + }) + + val source1 = Source(1 to 3).via(flow).runWith(TestSink[Int]()) + val source2 = Source(10 to 12).via(flow).runWith(TestSink[Int]()) + + source1.request(3) + .expectNext(1, 3, 6) + .expectComplete() + source2.request(3) + .expectNext(10, 21, 33) + .expectComplete() + + // Factory should be called once per materialization + stateCounter.get() shouldBe 2 + } + + "call onComplete for empty upstream" in { + val gate = BeenCalledTimesGate() + Source.empty[Int] + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = + gate.mark() + }) + .runWith(TestSink[Int]()) + .request(1) + .expectComplete() + gate.ensure() + } + + "fail when onComplete emits null" in { + Source.single(1) + .gather(() => + new Gatherer[Int, String] { + override def apply(elem: Int, collector: GatherCollector[String]): Unit = () + override def onComplete(collector: GatherCollector[String]): Unit = + collector.push(null.asInstanceOf[String]) + }) + .runWith(TestSink[String]()) + .request(1) + .expectError() shouldBe a[NullPointerException] + } + + "handle 3+ outputs with backpressure mid-drain" in { + Source.single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + collector.push(elem + 1) + collector.push(elem + 2) + collector.push(elem + 3) + } + }) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1, 2) + .expectNoMessage(200.millis) + .request(2) + .expectNext(3, 4) + .expectComplete() + } + + "support junction output ports" in { + val source = Source(List((1, 1), (2, 2))) + val graph = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink[(Int, Int)]()) { implicit b => sink => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[Int, Int]()) + val zip = b.add(Zip[Int, Int]()) + val gather = b.add(Flow[(Int, Int)].gather(() => + (elem: (Int, Int), collector: GatherCollector[(Int, Int)]) => collector.push(elem))) + + source ~> unzip.in + unzip.out0 ~> zip.in0 + unzip.out1 ~> zip.in1 + zip.out ~> gather ~> sink.in + + ClosedShape + }) + + graph + .run() + .request(2) + .expectNext((1, 1)) + .expectNext((2, 2)) + .expectComplete() + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 6de53600bd7..09459906c02 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -74,6 +74,7 @@ import pekko.stream.Attributes._ val batchWeighted = name("batchWeighted") val expand = name("expand") val statefulMap = name("statefulMap") + val gather = name("gather") val statefulMapConcat = name("statefulMapConcat") val mapConcat = name("mapConcat") val detacher = name("detacher") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index d214479d4ec..e7955dcad2e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -48,10 +48,14 @@ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource } import pekko.stream.scaladsl.{ DelayStrategy, + GatherCollector, + Gatherer, + OneToOneGatherer, Source, StatefulMapConcatAccumulator, StatefulMapConcatAccumulatorFactory } +import pekko.stream.javadsl.{ Gatherers => JGatherers } import pekko.stream.stage._ import pekko.util.{ ConstantFun, OptionVal } @@ -2328,6 +2332,325 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = override def toString = "StatefulMap" } +/** + * INTERNAL API + */ +@InternalApi +private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) extends GraphStage[FlowShape[In, Out]] { + require(factory != null, "factory should not be null") + + private val in = Inlet[In]("Gather.in") + private val out = Outlet[Out]("Gather.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.gather and SourceLocation.forLambda(factory) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private object FinalAction { + final val None = 0 + final val Complete = 1 + final val Restart = 2 + final val Fail = 3 + } + + private lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private val contextPropagation = ContextPropagation() + private val noopCollector = new GatherCollector[Out] { + override def push(elem: Out): Unit = () + } + // Hot-path collector: buffers the first push to callbackFirst for exception-rollback safety. + // Uses `callbackFirst eq null` as a sentinel instead of a separate boolean flag. + // On a second push (multi-output): moves callbackFirst -> pendingFirst and enters multi-mode. + private val singleCollector = new GatherCollector[Out] { + override def push(elem: Out): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + if (hasPendingFirst) { + // Already in multi mode: all pushes go directly to overflow queue. + if (pendingOverflow eq null) + pendingOverflow = new java.util.ArrayDeque[Out]() + pendingOverflow.addLast(elem) + } else { + val cb = callbackFirst + if (cb.asInstanceOf[AnyRef] eq null) { + callbackFirst = elem + } else { + // Second output from this gather call: transition to multi mode. + pendingFirst = cb + hasPendingFirst = true + callbackFirst = null.asInstanceOf[Out] + multiMode = true + if (pendingOverflow eq null) + pendingOverflow = new java.util.ArrayDeque[Out]() + pendingOverflow.addLast(elem) + } + } + } + } + // Used only by onPushMulti and invokeOnCompleteAndThen (always queues, never direct). + private val pendingCollector = new GatherCollector[Out] { + override def push(elem: Out): Unit = enqueuePendingOutput(elem) + } + // callbackFirst serves as a sentinel: null = "no output buffered this gather call"; + // non-null = "one output is buffered, not yet pushed to downstream". + // This eliminates the separate hasCallbackFirst boolean and its per-element writes. + private var callbackFirst: Out = _ + private var pendingFirst: Out = _ + private var pendingOverflow: java.util.ArrayDeque[Out] = _ + private var hasPendingFirst = false + private var multiMode = false + private var gatherer: Gatherer[In, Out] = _ + // Hot-path handle for one-to-one mappings. Supports both Scala and Java DSL implementations. + private var oneToOneGatherer: AnyRef = _ + private var finalAction = FinalAction.None + private var finalFailure: Throwable = null + private var needInvokeOnCompleteCallback = false + private var downstreamFinished = false + + override def preStart(): Unit = { + restartGatherer() + pull(in) + } + + override def onPush(): Unit = + try { + if (oneToOneGatherer ne null) onPushOneToOne() + else if (multiMode) onPushMulti() + else onPushSingle() + } catch { + case NonFatal(ex) => + clearPending() + if (!downstreamFinished) + decider(ex) match { + case Supervision.Stop => invokeOnCompleteAndThen(FinalAction.Fail, ex) + case Supervision.Resume => maybePull() + case Supervision.Restart => invokeOnCompleteAndThen(FinalAction.Restart) + } + } + + override def onPull(): Unit = + if (hasPendingFirst) { + if (multiMode) + pushPendingMulti(shouldResumeContext = needInvokeOnCompleteCallback) + else + pushPendingSingle(shouldResumeContext = needInvokeOnCompleteCallback) + } else maybePull() + + override def onUpstreamFinish(): Unit = + if (hasPendingFirst) { + if (finalAction != FinalAction.Fail) + finalAction = FinalAction.Complete + } else invokeOnCompleteAndThen(FinalAction.Complete) + + override def onUpstreamFailure(ex: Throwable): Unit = + if (hasPendingFirst) { + finalFailure = ex + finalAction = FinalAction.Fail + } else invokeOnCompleteAndThen(FinalAction.Fail, ex) + + override def onDownstreamFinish(cause: Throwable): Unit = { + downstreamFinished = true + if (needInvokeOnCompleteCallback) { + needInvokeOnCompleteCallback = false + gatherer.onComplete(noopCollector) + } + super.onDownstreamFinish(cause) + } + + override def postStop(): Unit = { + if (needInvokeOnCompleteCallback) + gatherer.onComplete(noopCollector) + } + + private def enqueuePendingOutput(elem: Out): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + if (hasPendingFirst) { + multiMode = true + if (pendingOverflow eq null) + pendingOverflow = new java.util.ArrayDeque[Out]() + pendingOverflow.addLast(elem) + } else { + pendingFirst = elem + hasPendingFirst = true + } + } + + private def onPushOneToOne(): Unit = { + val elem = oneToOneGatherer match { + case s: OneToOneGatherer[In, Out] @unchecked => s.applyOne(grab(in)) + case j: JGatherers.OneToOneGatherer[In, Out] @unchecked => j.applyOne(grab(in)) + } + ReactiveStreamsCompliance.requireNonNullElement(elem) + if (isAvailable(out)) + push(out, elem) + else { + pendingFirst = elem + hasPendingFirst = true + contextPropagation.suspendContext() + } + } + + // Optimised hot path for the common case (<=1 output per gather call, output available). + // The callbackFirst sentinel eliminates the hasCallbackFirst boolean field and its writes. + // After a successful push the pull is inlined, removing the maybePull call chain that was + // adding extra virtual dispatches per element. + private def onPushSingle(): Unit = { + gatherer(grab(in), singleCollector) + val cb = callbackFirst + if (cb.asInstanceOf[AnyRef] ne null) { + // Exactly 1 output buffered: clear sentinel, then push. + callbackFirst = null.asInstanceOf[Out] + if (isAvailable(out)) { + push(out, cb) + // Inline the common post-push continuation: pull for next element. + // finalAction is almost never set on the hot path; keep it in a cold branch. + if (finalAction == FinalAction.None) { + // grab() cleared hasBeenPulled, so pull(in) is always safe here. + if (!isClosed(in)) pull(in) + } else + afterPushFinalAction() + } else { + pendingFirst = cb + hasPendingFirst = true + contextPropagation.suspendContext() + } + } else if (hasPendingFirst && isAvailable(out)) { + // 2+ outputs from this gather call (multi-output case). + if (multiMode) + pushPendingMulti(shouldResumeContext = false) + else + pushPendingSingle(shouldResumeContext = false) + } else if (hasPendingFirst) + contextPropagation.suspendContext() + else + maybePull() // 0 outputs: pull immediately for the next element + } + + private def onPushMulti(): Unit = { + gatherer(grab(in), pendingCollector) + if (hasPendingFirst && isAvailable(out)) + pushPendingMulti(shouldResumeContext = false) + else if (hasPendingFirst) + contextPropagation.suspendContext() + else + maybePull() + } + + // Cold path: handles a deferred final action after an inlined push in onPushSingle. + private def afterPushFinalAction(): Unit = { + if (downstreamFinished || isClosed(out)) { + finalAction = FinalAction.None + finalFailure = null + } else { + val action = finalAction + val failure = finalFailure + finalAction = FinalAction.None + finalFailure = null + if (needInvokeOnCompleteCallback) invokeOnCompleteAndThen(action, failure) + else execute(action, failure) + } + } + + private def maybePull(): Unit = + if (!isClosed(in) && !hasBeenPulled(in)) + pull(in) + + private def restartGatherer(): Unit = { + val newGatherer = factory() + if (newGatherer eq null) + throw new IllegalStateException("Gatherer factory must not return null") + gatherer = newGatherer + oneToOneGatherer = gatherer match { + case _: OneToOneGatherer[?, ?] => gatherer + case _: JGatherers.OneToOneGatherer[?, ?] => gatherer + case _ => null + } + multiMode = false + pendingOverflow = null + needInvokeOnCompleteCallback = true + } + + private def clearPending(): Unit = { + callbackFirst = null.asInstanceOf[Out] + pendingFirst = null.asInstanceOf[Out] + hasPendingFirst = false + if (pendingOverflow ne null) + pendingOverflow.clear() + } + + private def pushPendingSingle(shouldResumeContext: Boolean): Unit = { + if (shouldResumeContext && needInvokeOnCompleteCallback) + contextPropagation.resumeContext() + val elem = pendingFirst + pendingFirst = null.asInstanceOf[Out] + hasPendingFirst = false + push(out, elem) + maybeRunFinalAction() + } + + private def pushPendingMulti(shouldResumeContext: Boolean): Unit = { + if (shouldResumeContext && needInvokeOnCompleteCallback) + contextPropagation.resumeContext() + push(out, pendingFirst) + if ((pendingOverflow ne null) && !pendingOverflow.isEmpty) { + pendingFirst = pendingOverflow.removeFirst() + if (needInvokeOnCompleteCallback) + contextPropagation.suspendContext() + } else { + pendingFirst = null.asInstanceOf[Out] + hasPendingFirst = false + multiMode = false // reset to single-output fast path after draining overflow queue + maybeRunFinalAction() + } + } + + private def maybeRunFinalAction(): Unit = { + if (downstreamFinished || isClosed(out)) { + finalAction = FinalAction.None + finalFailure = null + } else if (finalAction == FinalAction.None) + maybePull() + else { + val action = finalAction + val failure = finalFailure + finalAction = FinalAction.None + finalFailure = null + if (needInvokeOnCompleteCallback) invokeOnCompleteAndThen(action, failure) + else execute(action, failure) + } + } + + private def invokeOnCompleteAndThen(action: Int, failure: Throwable = null): Unit = { + needInvokeOnCompleteCallback = false + gatherer.onComplete(pendingCollector) + if (hasPendingFirst) { + finalAction = action + finalFailure = failure + if (isAvailable(out)) + if (multiMode) + pushPendingMulti(shouldResumeContext = false) + else + pushPendingSingle(shouldResumeContext = false) + } else + execute(action, failure) + } + + private def execute(action: Int, failure: Throwable): Unit = + action match { + case FinalAction.None => maybePull() + case FinalAction.Complete => completeStage() + case FinalAction.Fail => failStage(failure) + case FinalAction.Restart => + restartGatherer() + maybePull() + } + + setHandlers(in, out, this) + } + + override def toString = "Gather" +} + /** * INTERNAL API */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1622dad4c6e..0bdd5de1293 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -693,7 +693,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @since 1.3.0 + * @since 2.0.0 */ def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.map(f(_)).collect { @@ -868,6 +868,52 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala new file mode 100644 index 00000000000..d65aaeb4111 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.javadsl + +import org.apache.pekko.annotation.DoNotInherit +import org.apache.pekko.japi.function + +/** + * Collector passed to [[Gatherer]] for emitting output elements. + * + * The collector is only valid while the current [[Gatherer]] callback is running. + * Emitted elements MUST NOT be `null`. + * + * @since 1.3.0 + */ +@DoNotInherit +trait GatherCollector[-Out] extends function.Procedure[Out] { + def push(elem: Out): Unit + + final override def apply(param: Out): Unit = push(param) +} + +/** + * A stateful gatherer for the `gather` operator. + * + * A new gatherer instance is created for each materialization and on each supervision restart. + * It can keep mutable state in fields or via captured variables. + * + * @since 1.3.0 + */ +@FunctionalInterface +trait Gatherer[-In, Out] extends function.Procedure2[In, GatherCollector[Out]] { + + /** + * Called once whenever the stage terminates or restarts: on upstream completion, upstream failure, + * downstream cancellation, abrupt stage termination, or when the stage is restarted due to supervision. + * + * Elements pushed to the collector are emitted only on upstream completion, upstream failure, + * or supervision restart. They are ignored on downstream cancellation and abrupt termination. + */ + def onComplete(collector: GatherCollector[Out]): Unit = () +} + +/** Factory methods for [[Gatherer]]. */ +object Gatherers { + + /** + * Creates a specialized `Gatherer` for one-to-one transformations (exactly one output per input). + * + * This variant avoids the overhead of the `GatherCollector` indirection and achieves the + * same performance as the native `map` operator while still supporting mutable state and + * the `onComplete` callback. + * + * @param f the one-to-one transformation function + * @since 1.3.0 + */ + def oneToOne[In, Out](f: function.Function[In, Out]): Gatherer[In, Out] = + new OneToOneGathererImpl[In, Out](f) + + /** + * Creates a specialized `Gatherer` for one-to-one transformations with an `onComplete` callback. + * + * @param f the one-to-one transformation function + * @param onComplete callback invoked when the stage terminates or restarts + * @since 1.3.0 + */ + def oneToOne[In, Out](f: function.Function[In, Out], onComplete: function.Effect): Gatherer[In, Out] = + new OneToOneGathererImpl[In, Out](f, onComplete) + + /** + * A specialized [[Gatherer]] for one-to-one transformations that avoids the `GatherCollector` overhead. + * + * @since 1.3.0 + */ + @DoNotInherit + trait OneToOneGatherer[In, Out] extends Gatherer[In, Out] { + def applyOne(in: In): Out + + final override def apply(in: In, collector: GatherCollector[Out]): Unit = + collector.push(applyOne(in)) + } + + private final class OneToOneGathererImpl[In, Out]( + f: function.Function[In, Out], + onCompleteCallback: function.Effect = null) + extends OneToOneGatherer[In, Out] { + override def applyOne(in: In): Out = f.apply(in) + override def onComplete(@annotation.nowarn collector: GatherCollector[Out]): Unit = + if (onCompleteCallback != null) onCompleteCallback.apply() + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index fe5b1f5ef0f..ee8d25a6701 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -209,7 +209,7 @@ object Source { /** * Create a `Source` from an `Optional` value, emitting the value if it is present. * - * @since 1.3.0 + * @since 2.0.0 */ def fromOption[T](optional: Optional[T]): Source[T, NotUsed] = if (optional.isPresent) single(optional.get()) else empty() @@ -2761,6 +2761,52 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.Source[T, Mat] = + new Source(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 14f525bb70f..0ce6b0fa478 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -168,7 +168,7 @@ final class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels * - * @since 1.3.0 + * @since 2.0.0 */ def mapOption[T](f: function.Function[Out, Optional[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.map(f(_)).collect { @@ -345,6 +345,52 @@ final class SubFlow[In, Out, Mat]( Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.SubFlow[In, T, Mat] = + new SubFlow(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 7edb90afd92..0513d06fe44 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -159,7 +159,7 @@ final class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels * - * @since 1.3.0 + * @since 2.0.0 */ def mapOption[T](f: function.Function[Out, Optional[T]]): SubSource[T, Mat] = new SubSource(delegate.map(f(_)).collect { @@ -336,6 +336,52 @@ final class SubSource[Out, Mat]( Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.SubSource[T, Mat] = + new SubSource(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 49d0c8382ef..9d46820b863 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1011,7 +1011,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * * @param errorConsumer function invoked when an error occurs - * @since 1.3.0 + * @since 2.0.0 */ def onErrorContinue[T <: Throwable](errorConsumer: Throwable => Unit)(implicit tag: ClassTag[T]): Repr[Out] = { this.withAttributes(ActorAttributes.supervisionStrategy { @@ -1264,6 +1264,35 @@ trait FlowOps[+Out, +Mat] { None }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or closures. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: () => Gatherer[Out, T]): Repr[T] = + via(new Gather[Out, T](create)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala new file mode 100644 index 00000000000..85fa35a0edf --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko.annotation.DoNotInherit + +/** + * Collector passed to [[Gatherer]] for emitting output elements. + * + * The collector is only valid while the current [[Gatherer]] callback is running. + * + * @since 1.3.0 + */ +@DoNotInherit +trait GatherCollector[-Out] { + def push(elem: Out): Unit +} + +/** + * A stateful gatherer for the `gather` operator. + * + * A new gatherer instance is created for each materialization and on each supervision restart. + * It can keep mutable state in fields or closures. + * + * @since 1.3.0 + */ +@FunctionalInterface +trait Gatherer[-In, +Out] { + def apply(in: In, collector: GatherCollector[Out]): Unit + + /** + * Called once whenever the stage terminates or restarts: on upstream completion, upstream failure, + * downstream cancellation, abrupt stage termination, or when the stage is restarted due to supervision. + * + * Elements pushed to the collector are emitted only on upstream completion, upstream failure, + * or supervision restart. They are ignored on downstream cancellation and abrupt termination. + */ + def onComplete(collector: GatherCollector[Out]): Unit = () +} + +/** Factory methods for [[Gatherer]]. */ +object Gatherer { + + /** + * Creates a specialized `Gatherer` for one-to-one transformations (exactly one output per input). + * + * This variant avoids the overhead of the `GatherCollector` indirection and achieves the + * same performance as the native `map` operator while still supporting mutable state and + * the `onComplete` callback. + * + * @param f the one-to-one transformation function + * @since 1.3.0 + */ + def oneToOne[In, Out](f: In => Out): Gatherer[In, Out] = + new OneToOneGathererImpl(f) + + /** + * Creates a specialized `Gatherer` for one-to-one transformations with an `onComplete` callback. + * + * @param f the one-to-one transformation function + * @param onComplete callback invoked when the stage terminates or restarts + * @since 1.3.0 + */ + def oneToOne[In, Out](f: In => Out, onComplete: () => Unit): Gatherer[In, Out] = + new OneToOneGathererImpl(f, onComplete) + + private final class OneToOneGathererImpl[In, Out]( + f: In => Out, + onCompleteCallback: () => Unit = () => ()) + extends OneToOneGatherer[In, Out] { + override def applyOne(in: In): Out = f(in) + override def onComplete(collector: GatherCollector[Out]): Unit = onCompleteCallback() + } +} + +/** + * INTERNAL API + */ +@DoNotInherit +@FunctionalInterface +private[stream] trait OneToOneGatherer[-In, +Out] extends Gatherer[In, Out] { + def applyOne(in: In): Out + + final override def apply(in: In, collector: GatherCollector[Out]): Unit = + collector.push(applyOne(in)) +}