diff --git a/.scala-steward.conf b/.scala-steward.conf index 68a6818fbc4..cf1ea2423e0 100644 --- a/.scala-steward.conf +++ b/.scala-steward.conf @@ -1,13 +1,12 @@ updates.pin = [ # https://github.com/apache/pekko/issues/2329 - { groupId = "io.aeron", version = "1.48." }, + { groupId = "io.aeron", version = "1.50." }, # Scala 3.3 is the latest LTS version { groupId = "org.scala-lang", artifactId = "scala3-library", version = "3.3." } # sbt-assembly 2.3 causes build issues (https://github.com/apache/pekko/pull/1744) { groupId = "com.eed3si9n", artifactId = "sbt-assembly", version = "2.2." } - # agrona major+minor version should match the one brought - # in by aeron - { groupId = "org.agrona", artifactId = "agrona", version = "2.2." } + # agrona major+minor version should match the one brought in by aeron + { groupId = "org.agrona", artifactId = "agrona", version = "2.4." } ] updates.ignore = [ diff --git a/docs/src/main/paradox/remoting-artery.md b/docs/src/main/paradox/remoting-artery.md index 39b22b07eb2..8e5b4861714 100644 --- a/docs/src/main/paradox/remoting-artery.md +++ b/docs/src/main/paradox/remoting-artery.md @@ -745,10 +745,10 @@ Given that Aeron jar files are in the classpath the standalone media driver can java io.aeron.driver.MediaDriver ``` -The needed classpath: +The classpath would be approximately (but you will need to fix up the version numbers): ``` -Agrona-0.5.4.jar:aeron-driver-1.0.1.jar:aeron-client-1.0.1.jar +agrona-2.4.1.jar:aeron-driver-1.50.4.jar:aeron-client-1.50.4.jar ``` You find those jar files on [Maven Central](https://search.maven.org/), or you can create a diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1ae09c74585..83ca91976ba 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,11 +30,11 @@ object Dependencies { val junit6Version = "6.0.3" val slf4jVersion = "2.0.17" // also update agrona version when updating aeron: - val aeronVersion = "1.48.10" + val aeronVersion = "1.50.4" // Use the major+minor agrona versions matching aeron at // https://github.com/aeron-io/aeron/blob/1.x.y/gradle/libs.versions.toml // (remember to also update the scala-steward pin) - val agronaVersion = "2.2.4" + val agronaVersion = "2.4.1" val nettyVersion = "4.2.13.Final" val logbackVersion = "1.5.32" diff --git a/remote-tests/src/test/scala/org/apache/pekko/remote/artery/ArteryFailedToBindSpec.scala b/remote-tests/src/test/scala/org/apache/pekko/remote/artery/ArteryFailedToBindSpec.scala index 103e2cb6187..b840aed53ff 100644 --- a/remote-tests/src/test/scala/org/apache/pekko/remote/artery/ArteryFailedToBindSpec.scala +++ b/remote-tests/src/test/scala/org/apache/pekko/remote/artery/ArteryFailedToBindSpec.scala @@ -58,7 +58,7 @@ class ArteryFailedToBindSpec extends AnyWordSpec with Matchers { } RARP(as).provider.transport.asInstanceOf[ArteryTransport].settings.Transport match { case ArterySettings.AeronUpd => - ex.getMessage should ===("Inbound Aeron channel is in errored state. See Aeron logs for details.") + ex.getMessage should startWith("Failed to create Aeron subscription") case ArterySettings.Tcp | ArterySettings.TlsTcp => ex.getMessage should startWith("Failed to bind TCP") } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/aeron/AeronSource.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/aeron/AeronSource.scala index b3bb950d017..17e1e5ad59d 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/aeron/AeronSource.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/aeron/AeronSource.scala @@ -19,12 +19,13 @@ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import io.aeron.{ Aeron, FragmentAssembler, Subscription } -import io.aeron.exceptions.DriverTimeoutException +import io.aeron.exceptions.{ AeronException, DriverTimeoutException } import io.aeron.logbuffer.FragmentHandler import io.aeron.logbuffer.Header import org.agrona.DirectBuffer import org.apache.pekko +import pekko.remote.RemoteTransportException import pekko.stream.Attributes import pekko.stream.Outlet import pekko.stream.SourceShape @@ -106,7 +107,14 @@ private[remote] class AeronSource( override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val logic = new GraphStageLogic(shape) with OutHandler with AeronLifecycle with StageLogging { - private val subscription = aeron.addSubscription(channel, streamId) + private val subscription = try { + aeron.addSubscription(channel, streamId) + } catch { + case e: AeronException => + throw new RemoteTransportException( + s"Failed to create Aeron subscription for channel [$channel] and streamId [$streamId]", + e) + } private var backoffCount = spinning private var delegateTaskStartTime = 0L private var countBeforeDelegate = 0L diff --git a/remote/src/test/java/org/apache/pekko/remote/artery/aeron/AeronStat.java b/remote/src/test/java/org/apache/pekko/remote/artery/aeron/AeronStat.java index cf120538886..5251051508e 100644 --- a/remote/src/test/java/org/apache/pekko/remote/artery/aeron/AeronStat.java +++ b/remote/src/test/java/org/apache/pekko/remote/artery/aeron/AeronStat.java @@ -31,13 +31,10 @@ import java.io.File; import java.io.PrintStream; import java.nio.MappedByteBuffer; -import java.util.Date; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.regex.Pattern; import org.agrona.DirectBuffer; import org.agrona.IoUtil; -import org.agrona.concurrent.SigInt; import org.agrona.concurrent.status.CountersReader; /** @@ -144,73 +141,6 @@ public static CountersReader mapCounters(final File cncFile) { createCountersValuesBuffer(cncByteBuffer, cncMetaData)); } - public static void main(final String[] args) throws Exception { - Pattern typeFilter = null; - Pattern identityFilter = null; - Pattern sessionFilter = null; - Pattern streamFilter = null; - Pattern channelFilter = null; - - if (0 != args.length) { - checkForHelp(args); - - for (final String arg : args) { - final int equalsIndex = arg.indexOf('='); - if (-1 == equalsIndex) { - System.out.println("Arguments must be in name=pattern format: Invalid '" + arg + "'"); - return; - } - - final String argName = arg.substring(0, equalsIndex); - final String argValue = arg.substring(equalsIndex + 1); - - switch (argName) { - case COUNTER_TYPE_ID: - typeFilter = Pattern.compile(argValue); - break; - - case COUNTER_IDENTITY: - identityFilter = Pattern.compile(argValue); - break; - - case COUNTER_SESSION_ID: - sessionFilter = Pattern.compile(argValue); - break; - - case COUNTER_STREAM_ID: - streamFilter = Pattern.compile(argValue); - break; - - case COUNTER_CHANNEL: - channelFilter = Pattern.compile(argValue); - break; - - default: - System.out.println("Unrecognised argument: '" + arg + "'"); - return; - } - } - } - - final AeronStat aeronStat = - new AeronStat( - mapCounters(), typeFilter, identityFilter, sessionFilter, streamFilter, channelFilter); - final AtomicBoolean running = new AtomicBoolean(true); - SigInt.register(() -> running.set(false)); - - while (running.get()) { - System.out.print("\033[H\033[2J"); - - System.out.format("%1$tH:%1$tM:%1$tS - Aeron Stat%n", new Date()); - System.out.println("========================="); - - aeronStat.print(System.out); - System.out.println("--"); - - Thread.sleep(ONE_SECOND); - } - } - public void print(final PrintStream out) { counters.forEach( (counterId, typeId, keyBuffer, label) -> { @@ -221,23 +151,6 @@ public void print(final PrintStream out) { }); } - private static void checkForHelp(final String[] args) { - for (final String arg : args) { - if ("-?".equals(arg) || "-h".equals(arg) || "-help".equals(arg)) { - System.out.format( - "Usage: [-Daeron.dir=] AeronStat%n" - + "\tfilter by optional regex patterns:%n" - + "\t[type=]%n" - + "\t[identity=]%n" - + "\t[sessionId=]%n" - + "\t[streamId=]%n" - + "\t[channel=]%n"); - - System.exit(0); - } - } - } - private boolean filter(final int typeId, final DirectBuffer keyBuffer) { if (!match(typeFilter, () -> Integer.toString(typeId))) { return false;