From a1d148f1369a4c03f8da305da5f8e3ab12eeb10c Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 16 Jul 2020 18:36:45 +1200 Subject: [PATCH 1/2] Avoid dead letter logs on event-sourced entity passivation --- .../cloudstate/proxy/eventsourced/EventSourcedEntity.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala index ee377987e..8429b68f8 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala @@ -78,7 +78,7 @@ final class EventSourcedEntitySupervisor(client: EventSourcedClient, NotUsed } ) - .runWith(Sink.actorRef(self, EventSourcedEntity.StreamClosed)) + .runWith(Sink.actorRef(self, EventSourcedEntity.StreamClosed, EventSourcedEntity.StreamFailed.apply)) context.become(waitingForRelay) } @@ -113,6 +113,7 @@ object EventSourcedEntity { final case object Stop final case object StreamClosed extends DeadLetterSuppression + final case class StreamFailed(cause: Throwable) extends DeadLetterSuppression final case class Configuration( serviceName: String, @@ -327,7 +328,7 @@ final class EventSourcedEntity(configuration: EventSourcedEntity.Configuration, notifyOutstandingRequests("Unexpected entity termination") context.stop(self) - case Status.Failure(error) => + case EventSourcedEntity.StreamFailed(error) => notifyOutstandingRequests("Unexpected entity termination") throw error From 1020fc00ece830d16724854980fd3b767a1c60ac Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 16 Jul 2020 18:36:47 +1200 Subject: [PATCH 2/2] Add graceful stop for event-sourced entities --- .../eventsourced/EventSourcedEntity.scala | 30 +++++++++++++++---- .../EventSourcedInstrumentationSpec.scala | 4 +-- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala index 8429b68f8..95deb1220 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/EventSourcedEntity.scala @@ -24,7 +24,7 @@ import akka.actor._ import akka.cluster.sharding.ShardRegion import akka.persistence._ import akka.stream.scaladsl._ -import akka.stream.{Materializer, OverflowStrategy} +import akka.stream.{CompletionStrategy, Materializer, OverflowStrategy} import akka.util.Timeout import com.google.protobuf.any.{Any => pbAny} import io.cloudstate.protocol.entity._ @@ -66,6 +66,8 @@ final class EventSourcedEntitySupervisor(client: EventSourcedClient, import EventSourcedEntitySupervisor._ + private var streamTerminated: Boolean = false + override final def receive: Receive = PartialFunction.empty override final def preStart(): Unit = { @@ -91,20 +93,38 @@ final class EventSourcedEntitySupervisor(client: EventSourcedClient, .actorOf(EventSourcedEntity.props(configuration, entityId, relayRef, concurrencyEnforcer, statsCollector), "entity") ) - context.become(forwarding(manager)) + context.become(forwarding(manager, relayRef)) unstashAll() case _ => stash() } - private[this] final def forwarding(manager: ActorRef): Receive = { + private[this] final def forwarding(manager: ActorRef, relay: ActorRef): Receive = { case Terminated(`manager`) => - context.stop(self) + if (streamTerminated) { + context.stop(self) + } else { + relay ! Status.Success(CompletionStrategy.draining) + context.become(stopping) + } case toParent if sender() == manager => context.parent ! toParent + case EventSourcedEntity.StreamClosed => + streamTerminated = true + manager forward EventSourcedEntity.StreamClosed + case failed: EventSourcedEntity.StreamFailed => + streamTerminated = true + manager forward failed case msg => manager forward msg } + private def stopping: Receive = { + case EventSourcedEntity.StreamClosed => + context.stop(self) + case _: EventSourcedEntity.StreamFailed => + context.stop(self) + } + override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy } @@ -184,8 +204,6 @@ final class EventSourcedEntity(configuration: EventSourcedEntity.Configuration, if (reportedDatabaseOperationStarted) { reportDatabaseOperationFinished() } - // This will shutdown the stream (if not already shut down) - relay ! Status.Success(()) instrumentation.entityPassivated() } diff --git a/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EventSourcedInstrumentationSpec.scala b/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EventSourcedInstrumentationSpec.scala index df68fce88..3047f90b7 100644 --- a/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EventSourcedInstrumentationSpec.scala +++ b/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EventSourcedInstrumentationSpec.scala @@ -16,7 +16,7 @@ package io.cloudstate.proxy.telemetry -import akka.actor.{ActorRef, Status} +import akka.actor.ActorRef import akka.testkit.TestProbe import com.google.protobuf.ByteString import com.google.protobuf.any.{Any => ProtoAny} @@ -240,8 +240,6 @@ class EventSourcedInstrumentationSpec extends AbstractTelemetrySpec { entity ! EventSourcedEntity.Stop - userFunction.expectMsg(Status.Success(())) - expectTerminated(entity) metricValue(PassivatedEntitiesTotal, EntityName -> "test") shouldBe 1