Skip to content
This repository was archived by the owner on Mar 16, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.actor._
import akka.cluster.sharding.ShardRegion
import akka.persistence._
import akka.stream.scaladsl._
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.{CompletionStrategy, Materializer, OverflowStrategy}
import akka.util.Timeout
import com.google.protobuf.any.{Any => pbAny}
import io.cloudstate.protocol.entity._
Expand Down Expand Up @@ -66,6 +66,8 @@ final class EventSourcedEntitySupervisor(client: EventSourcedClient,

import EventSourcedEntitySupervisor._

private var streamTerminated: Boolean = false

override final def receive: Receive = PartialFunction.empty

override final def preStart(): Unit = {
Expand All @@ -78,7 +80,7 @@ final class EventSourcedEntitySupervisor(client: EventSourcedClient,
NotUsed
}
)
.runWith(Sink.actorRef(self, EventSourcedEntity.StreamClosed))
.runWith(Sink.actorRef(self, EventSourcedEntity.StreamClosed, EventSourcedEntity.StreamFailed.apply))
context.become(waitingForRelay)
}

Expand All @@ -91,20 +93,38 @@ final class EventSourcedEntitySupervisor(client: EventSourcedClient,
.actorOf(EventSourcedEntity.props(configuration, entityId, relayRef, concurrencyEnforcer, statsCollector),
"entity")
)
context.become(forwarding(manager))
context.become(forwarding(manager, relayRef))
unstashAll()
case _ => stash()
}

private[this] final def forwarding(manager: ActorRef): Receive = {
private[this] final def forwarding(manager: ActorRef, relay: ActorRef): Receive = {
case Terminated(`manager`) =>
context.stop(self)
if (streamTerminated) {
context.stop(self)
} else {
relay ! Status.Success(CompletionStrategy.draining)
context.become(stopping)
}
case toParent if sender() == manager =>
context.parent ! toParent
case EventSourcedEntity.StreamClosed =>
streamTerminated = true
manager forward EventSourcedEntity.StreamClosed
case failed: EventSourcedEntity.StreamFailed =>
streamTerminated = true
manager forward failed
case msg =>
manager forward msg
}

private def stopping: Receive = {
case EventSourcedEntity.StreamClosed =>
context.stop(self)
case _: EventSourcedEntity.StreamFailed =>
context.stop(self)
}

override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
}

Expand All @@ -113,6 +133,7 @@ object EventSourcedEntity {
final case object Stop

final case object StreamClosed extends DeadLetterSuppression
final case class StreamFailed(cause: Throwable) extends DeadLetterSuppression

final case class Configuration(
serviceName: String,
Expand Down Expand Up @@ -183,8 +204,6 @@ final class EventSourcedEntity(configuration: EventSourcedEntity.Configuration,
if (reportedDatabaseOperationStarted) {
reportDatabaseOperationFinished()
}
// This will shutdown the stream (if not already shut down)
relay ! Status.Success(())
instrumentation.entityPassivated()
}

Expand Down Expand Up @@ -327,7 +346,7 @@ final class EventSourcedEntity(configuration: EventSourcedEntity.Configuration,
notifyOutstandingRequests("Unexpected entity termination")
context.stop(self)

case Status.Failure(error) =>
case EventSourcedEntity.StreamFailed(error) =>
notifyOutstandingRequests("Unexpected entity termination")
throw error

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.cloudstate.proxy.telemetry

import akka.actor.{ActorRef, Status}
import akka.actor.ActorRef
import akka.testkit.TestProbe
import com.google.protobuf.ByteString
import com.google.protobuf.any.{Any => ProtoAny}
Expand Down Expand Up @@ -240,8 +240,6 @@ class EventSourcedInstrumentationSpec extends AbstractTelemetrySpec {

entity ! EventSourcedEntity.Stop

userFunction.expectMsg(Status.Success(()))

expectTerminated(entity)

metricValue(PassivatedEntitiesTotal, EntityName -> "test") shouldBe 1
Expand Down