Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
updates.pin = [
# https://github.com/apache/pekko/issues/2329
{ groupId = "io.aeron", version = "1.48." },
{ groupId = "io.aeron", version = "1.50." },
# Scala 3.3 is the latest LTS version
{ groupId = "org.scala-lang", artifactId = "scala3-library", version = "3.3." }
# sbt-assembly 2.3 causes build issues (https://github.com/apache/pekko/pull/1744)
{ groupId = "com.eed3si9n", artifactId = "sbt-assembly", version = "2.2." }
# agrona major+minor version should match the one brought
# in by aeron
{ groupId = "org.agrona", artifactId = "agrona", version = "2.2." }
# agrona major+minor version should match the one brought in by aeron
{ groupId = "org.agrona", artifactId = "agrona", version = "2.4." }
]

updates.ignore = [
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/paradox/remoting-artery.md
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,10 @@ Given that Aeron jar files are in the classpath the standalone media driver can
java io.aeron.driver.MediaDriver
```

The needed classpath:
The classpath would be approximately (but you will need to fix up the version numbers):

```
Agrona-0.5.4.jar:aeron-driver-1.0.1.jar:aeron-client-1.0.1.jar
agrona-2.4.1.jar:aeron-driver-1.50.4.jar:aeron-client-1.50.4.jar
```

You find those jar files on [Maven Central](https://search.maven.org/), or you can create a
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ object Dependencies {
val junit6Version = "6.0.3"
val slf4jVersion = "2.0.17"
// also update agrona version when updating aeron:
val aeronVersion = "1.48.10"
val aeronVersion = "1.50.4"
// Use the major+minor agrona versions matching aeron at
// https://github.com/aeron-io/aeron/blob/1.x.y/gradle/libs.versions.toml
// (remember to also update the scala-steward pin)
val agronaVersion = "2.2.4"
val agronaVersion = "2.4.1"
val nettyVersion = "4.2.13.Final"
val logbackVersion = "1.5.32"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ArteryFailedToBindSpec extends AnyWordSpec with Matchers {
}
RARP(as).provider.transport.asInstanceOf[ArteryTransport].settings.Transport match {
case ArterySettings.AeronUpd =>
ex.getMessage should ===("Inbound Aeron channel is in errored state. See Aeron logs for details.")
ex.getMessage should startWith("Failed to create Aeron subscription")
case ArterySettings.Tcp | ArterySettings.TlsTcp =>
ex.getMessage should startWith("Failed to bind TCP")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal

import io.aeron.{ Aeron, FragmentAssembler, Subscription }
import io.aeron.exceptions.DriverTimeoutException
import io.aeron.exceptions.{ AeronException, DriverTimeoutException }
import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer

import org.apache.pekko
import pekko.remote.RemoteTransportException
import pekko.stream.Attributes
import pekko.stream.Outlet
import pekko.stream.SourceShape
Expand Down Expand Up @@ -106,7 +107,14 @@ private[remote] class AeronSource(
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) with OutHandler with AeronLifecycle with StageLogging {

private val subscription = aeron.addSubscription(channel, streamId)
private val subscription = try {
aeron.addSubscription(channel, streamId)
} catch {
case e: AeronException =>
throw new RemoteTransportException(
s"Failed to create Aeron subscription for channel [$channel] and streamId [$streamId]",
e)
}
private var backoffCount = spinning
private var delegateTaskStartTime = 0L
private var countBeforeDelegate = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@
import java.io.File;
import java.io.PrintStream;
import java.nio.MappedByteBuffer;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.agrona.DirectBuffer;
import org.agrona.IoUtil;
import org.agrona.concurrent.SigInt;
import org.agrona.concurrent.status.CountersReader;

/**
Expand Down Expand Up @@ -144,73 +141,6 @@ public static CountersReader mapCounters(final File cncFile) {
createCountersValuesBuffer(cncByteBuffer, cncMetaData));
}

public static void main(final String[] args) throws Exception {
Pattern typeFilter = null;
Pattern identityFilter = null;
Pattern sessionFilter = null;
Pattern streamFilter = null;
Pattern channelFilter = null;

if (0 != args.length) {
checkForHelp(args);

for (final String arg : args) {
final int equalsIndex = arg.indexOf('=');
if (-1 == equalsIndex) {
System.out.println("Arguments must be in name=pattern format: Invalid '" + arg + "'");
return;
}

final String argName = arg.substring(0, equalsIndex);
final String argValue = arg.substring(equalsIndex + 1);

switch (argName) {
case COUNTER_TYPE_ID:
typeFilter = Pattern.compile(argValue);
break;

case COUNTER_IDENTITY:
identityFilter = Pattern.compile(argValue);
break;

case COUNTER_SESSION_ID:
sessionFilter = Pattern.compile(argValue);
break;

case COUNTER_STREAM_ID:
streamFilter = Pattern.compile(argValue);
break;

case COUNTER_CHANNEL:
channelFilter = Pattern.compile(argValue);
break;

default:
System.out.println("Unrecognised argument: '" + arg + "'");
return;
}
}
}

final AeronStat aeronStat =
new AeronStat(
mapCounters(), typeFilter, identityFilter, sessionFilter, streamFilter, channelFilter);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));

while (running.get()) {
System.out.print("\033[H\033[2J");

System.out.format("%1$tH:%1$tM:%1$tS - Aeron Stat%n", new Date());
System.out.println("=========================");

aeronStat.print(System.out);
System.out.println("--");

Thread.sleep(ONE_SECOND);
}
}

public void print(final PrintStream out) {
counters.forEach(
(counterId, typeId, keyBuffer, label) -> {
Expand All @@ -221,23 +151,6 @@ public void print(final PrintStream out) {
});
}

private static void checkForHelp(final String[] args) {
for (final String arg : args) {
if ("-?".equals(arg) || "-h".equals(arg) || "-help".equals(arg)) {
System.out.format(
"Usage: [-Daeron.dir=<directory containing CnC file>] AeronStat%n"
+ "\tfilter by optional regex patterns:%n"
+ "\t[type=<pattern>]%n"
+ "\t[identity=<pattern>]%n"
+ "\t[sessionId=<pattern>]%n"
+ "\t[streamId=<pattern>]%n"
+ "\t[channel=<pattern>]%n");

System.exit(0);
}
}
}

private boolean filter(final int typeId, final DirectBuffer keyBuffer) {
if (!match(typeFilter, () -> Integer.toString(typeId))) {
return false;
Expand Down