diff --git a/CONFORMANCE.md b/CONFORMANCE.md index c055e84..c0f3767 100644 --- a/CONFORMANCE.md +++ b/CONFORMANCE.md @@ -139,7 +139,6 @@ works" is decomposed into multiple binary rows. - HTTP/2 + QUIC transports - mTLS / OAuth2 auth schemes -- stdio newline-delimited JSON transport (`MemoryTransport` covers in-process use) - §15.6 trust elevation - Quarkus and Helidon middleware (Phase 5 deferred them; `arcp-runtime-jetty`, `arcp-middleware-jakarta`, `arcp-middleware-spring-boot`, and diff --git a/arcp-client/src/main/java/dev/arcp/client/ArcpClient.java b/arcp-client/src/main/java/dev/arcp/client/ArcpClient.java index cb81a24..ea4fefb 100644 --- a/arcp-client/src/main/java/dev/arcp/client/ArcpClient.java +++ b/arcp-client/src/main/java/dev/arcp/client/ArcpClient.java @@ -49,6 +49,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.ScheduledExecutorService; @@ -69,6 +71,13 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber safeFeatureCopy(Set features) { + if (features == null || features.isEmpty()) { + return EnumSet.noneOf(Feature.class); + } + return EnumSet.copyOf(features); + } + private final Transport transport; private final ObjectMapper mapper; private final ClientInfo info; @@ -88,6 +97,12 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber heartbeatWatchdog; private final ConcurrentHashMap> liveSubscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap liveExecutors = new ConcurrentHashMap<>(); + private final boolean ownedScheduler; + + // Reserved for future use by tests asserting FIFO insertion order of pending submits. + @SuppressWarnings("unused") + private final ConcurrentLinkedDeque pendingSubmitOrder = new ConcurrentLinkedDeque<>(); @SuppressWarnings("unused") private Flow.@Nullable Subscription subscription; @@ -103,16 +118,19 @@ private ArcpClient(Builder b) { this.mapper = b.mapper != null ? b.mapper : ArcpMapper.shared(); this.info = b.info; this.auth = b.auth; - this.requestedFeatures = EnumSet.copyOf(b.features); + this.requestedFeatures = safeFeatureCopy(b.features); this.autoAck = b.autoAck; this.ackInterval = b.ackInterval; - this.scheduler = - b.scheduler != null - ? b.scheduler - : Executors.newScheduledThreadPool( - 1, - r -> - Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r)); + if (b.scheduler != null) { + this.scheduler = b.scheduler; + this.ownedScheduler = false; + } else { + this.scheduler = + Executors.newScheduledThreadPool( + 1, + r -> Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r)); + this.ownedScheduler = true; + } this.resumeToken = b.resumeToken; this.lastEventSeq = b.lastEventSeq; } @@ -130,11 +148,16 @@ public CompletableFuture connect() { return sessionFuture; } - public Session connect(Duration timeout) throws InterruptedException, TimeoutException { + public Session connect(Duration timeout) + throws InterruptedException, TimeoutException, ArcpException { try { return connect().get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (java.util.concurrent.ExecutionException e) { - throw new IllegalStateException("connect failed", e.getCause()); + Throwable cause = e.getCause() != null ? e.getCause() : e; + if (cause instanceof ArcpException ax) { + throw ax; + } + throw new IllegalStateException("connect failed", cause); } } @@ -144,16 +167,35 @@ public JobHandle submit(JobSubmit submit) { public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) { Outstanding o = new Outstanding(); - // Pre-register the outstanding job under a request-id keyed slot so that - // when JobAccepted arrives we associate the job_id. MessageId requestId = MessageId.generate(); - pendingSubmits.put(requestId, o); - send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId); + // The put-then-send pair must be atomic w.r.t. other submits so that the + // FIFO order of pendingSubmits matches the wire order observed by the + // runtime; that ordering is how handleAccepted correlates JobAccepted + // back to the right pending submit (the runtime does not echo our + // request id on job.accepted). + submitLock.lock(); + try { + pendingSubmits.add(new PendingSubmit(requestId, o)); + send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId); + } finally { + submitLock.unlock(); + } return o.handleFuture.join(); } - public Page listJobs(@Nullable JobFilter filter) { - SessionListJobs req = new SessionListJobs(filter, null, null); + public Page listJobs(@Nullable JobFilter filter) + throws InterruptedException, TimeoutException, ArcpException { + return listJobs(filter, null, null); + } + + /** + * List jobs with optional pagination. Supply {@code cursor} from the previous {@link Page} to + * continue, or {@code null} to fetch the first page. {@code limit} caps the page size. + */ + public Page listJobs( + @Nullable JobFilter filter, @Nullable Integer limit, @Nullable String cursor) + throws InterruptedException, TimeoutException, ArcpException { + SessionListJobs req = new SessionListJobs(filter, limit, cursor); MessageId reqId = MessageId.generate(); CompletableFuture fut = new CompletableFuture<>(); listRequests.put(reqId, fut); @@ -161,23 +203,66 @@ public Page listJobs(@Nullable JobFilter filter) { try { SessionJobs response = fut.get(10, TimeUnit.SECONDS); return new Page<>(response.jobs(), response.nextCursor()); - } catch (java.util.concurrent.ExecutionException | InterruptedException | TimeoutException e) { - throw new RuntimeException("list_jobs failed", e); + } catch (InterruptedException e) { + listRequests.remove(reqId); + Thread.currentThread().interrupt(); + throw e; + } catch (TimeoutException e) { + listRequests.remove(reqId); + throw e; + } catch (java.util.concurrent.ExecutionException e) { + listRequests.remove(reqId); + Throwable cause = e.getCause() != null ? e.getCause() : e; + if (cause instanceof ArcpException ax) { + throw ax; + } + throw new IllegalStateException("list_jobs failed", cause); } } public Flow.Publisher subscribe(JobId jobId, SubscribeOptions options) { + java.util.concurrent.atomic.AtomicBoolean inserted = + new java.util.concurrent.atomic.AtomicBoolean(false); SubmissionPublisher pub = liveSubscribers.computeIfAbsent( jobId, - k -> new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024)); - JobSubscribe sub = - new JobSubscribe( - jobId, options.history() ? options.fromEventSeq() : null, options.history()); - send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null); + k -> { + inserted.set(true); + ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); + liveExecutors.put(jobId, exec); + return new SubmissionPublisher<>(exec, 1024); + }); + if (inserted.get()) { + JobSubscribe sub = + new JobSubscribe( + jobId, options.history() ? options.fromEventSeq() : null, options.history()); + send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null); + } return pub; } + /** + * Locally unsubscribe from job events and notify the runtime via {@code job.unsubscribe}. Closes + * the local {@link Flow.Publisher} so any downstream subscribers see {@code onComplete}. + */ + public void unsubscribe(JobId jobId) { + SubmissionPublisher pub = liveSubscribers.remove(jobId); + if (pub != null) { + pub.close(); + } + ExecutorService exec = liveExecutors.remove(jobId); + if (exec != null) { + exec.shutdown(); + } + if (!closed) { + try { + send(Message.Type.JOB_UNSUBSCRIBE, new JobUnsubscribe(jobId), sessionId, null, jobId, null); + } catch (RuntimeException ignored) { + // best-effort + } + } + } + public void ack(long lastProcessedSeq) { send(Message.Type.SESSION_ACK, new SessionAck(lastProcessedSeq), sessionId, null, null, null); } @@ -204,12 +289,18 @@ public void close() { for (var pub : liveSubscribers.values()) { pub.close(); } + for (ExecutorService exec : liveExecutors.values()) { + exec.shutdown(); + } + liveExecutors.clear(); try { transport.close(); } catch (RuntimeException ignored) { // best-effort close } - scheduler.shutdownNow(); + if (ownedScheduler) { + scheduler.shutdownNow(); + } } /** Returns the highest event sequence number seen from the server, or -1 if none. */ @@ -248,17 +339,53 @@ public void onNext(Envelope envelope) { @Override public void onError(Throwable throwable) { - sessionFuture.completeExceptionally(throwable); + failAll(throwable); } @Override public void onComplete() { if (!sessionFuture.isDone()) { - sessionFuture.completeExceptionally( - new IllegalStateException("transport closed before welcome")); + failAll(new IllegalStateException("transport closed before welcome")); + } else { + // Transport closed after welcome — fail anything still in flight. + failAll(new IllegalStateException("transport closed")); } } + /** + * Fail every outstanding future and complete every live subscription publisher exceptionally. + * Called on transport error, unexpected transport close, and heartbeat-loss close so a caller + * blocked on {@link #submit}, {@link #listJobs}, or {@link JobHandle#result()} does not wait + * forever. + */ + private void failAll(Throwable cause) { + if (!sessionFuture.isDone()) { + sessionFuture.completeExceptionally(cause); + } + for (PendingSubmit head; (head = pendingSubmits.pollFirst()) != null; ) { + head.outstanding().handleFuture.completeExceptionally(cause); + } + for (java.util.Map.Entry e : outstanding.entrySet()) { + Outstanding o = e.getValue(); + if (!o.resultFuture.isDone()) { + o.resultFuture.completeExceptionally(cause); + } + o.events.close(); + } + outstanding.clear(); + for (java.util.Map.Entry> e : + listRequests.entrySet()) { + if (!e.getValue().isDone()) { + e.getValue().completeExceptionally(cause); + } + } + listRequests.clear(); + for (SubmissionPublisher pub : liveSubscribers.values()) { + pub.closeExceptionally(cause); + } + liveSubscribers.clear(); + } + private void dispatch(Envelope envelope) { Message m; try { @@ -338,25 +465,23 @@ private void watchHeartbeat(long intervalMs) { long elapsed = System.currentTimeMillis() - lastInboundMillis.get(); if (elapsed > intervalMs * 2) { log.info("client observed heartbeat loss; closing session"); + failAll(new IllegalStateException("heartbeat lost")); close(); } } - private final ConcurrentHashMap pendingSubmits = - new ConcurrentHashMap<>(); + private record PendingSubmit(MessageId requestId, Outstanding outstanding) {} + + private final ConcurrentLinkedDeque pendingSubmits = new ConcurrentLinkedDeque<>(); + private final java.util.concurrent.locks.ReentrantLock submitLock = + new java.util.concurrent.locks.ReentrantLock(); private void handleAccepted(Envelope envelope, JobAccepted accepted) { - // We associate by traversing pending submits in insertion order; the - // runtime guarantees ordering per-session, so the oldest pending submit - // is the one being acknowledged. - MessageId match = pendingSubmits.keySet().stream().findFirst().orElse(null); - if (match == null) { - return; - } - Outstanding o = pendingSubmits.remove(match); - if (o == null) { + PendingSubmit head = pendingSubmits.pollFirst(); + if (head == null) { return; } + Outstanding o = head.outstanding(); o.jobId = accepted.jobId(); outstanding.put(accepted.jobId(), o); o.handleFuture.complete(new ClientJobHandle(accepted, o)); @@ -395,14 +520,11 @@ private void handleError(Envelope envelope, JobError err) { JobId jid = envelope.jobId(); Outstanding o = jid != null ? outstanding.remove(jid) : null; if (o == null) { - // Top-level (unassigned) error: drop the oldest pending submit. - MessageId first = pendingSubmits.keySet().stream().findFirst().orElse(null); - if (first != null) { - Outstanding pending = pendingSubmits.remove(first); - if (pending != null) { - ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message())); - pending.handleFuture.completeExceptionally(ex); - } + // Top-level (unassigned) error: fail the oldest pending submit. + PendingSubmit head = pendingSubmits.pollFirst(); + if (head != null) { + ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message())); + head.outstanding().handleFuture.completeExceptionally(ex); } return; } @@ -536,7 +658,7 @@ public Builder bearer(String token) { } public Builder features(Set features) { - this.features = EnumSet.copyOf(features); + this.features = safeFeatureCopy(features); return this; } diff --git a/arcp-client/src/main/java/dev/arcp/client/Page.java b/arcp-client/src/main/java/dev/arcp/client/Page.java index eef91ba..3ae89c5 100644 --- a/arcp-client/src/main/java/dev/arcp/client/Page.java +++ b/arcp-client/src/main/java/dev/arcp/client/Page.java @@ -5,6 +5,10 @@ import org.jspecify.annotations.Nullable; public record Page(List items, @Nullable String nextCursor) { + public Page { + items = items == null ? List.of() : List.copyOf(items); + } + public static Page empty() { return new Page<>(List.of(), null); } diff --git a/arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java b/arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java index 5638cd3..86620ba 100644 --- a/arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java +++ b/arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java @@ -1,8 +1,11 @@ package dev.arcp.client; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; @@ -13,16 +16,22 @@ * Multicast {@link Flow.Publisher} that buffers every emission and replays the buffer to each newly * attached subscriber before forwarding live deliveries. Used by {@link JobHandle#events()} so * callers subscribing after a job has already emitted still see the full event history. + * + *

Each {@code subscribe} delivers {@code onSubscribe} as its first signal (Reactive Streams + * §1.9): the replay snapshot and any live items that race in during replay are queued until the + * downstream subscriber has acknowledged the subscription. */ -final class ReplayingPublisher implements Flow.Publisher { +final class ReplayingPublisher implements Flow.Publisher, AutoCloseable { private final List buffer = new CopyOnWriteArrayList<>(); private final SubmissionPublisher live; + private final ExecutorService liveExecutor; private final ReentrantLock lock = new ReentrantLock(); private volatile boolean closed; ReplayingPublisher() { - this.live = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.liveExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.live = new SubmissionPublisher<>(liveExecutor, 1024); } // Lock spans live.submit so concurrent producers preserve buffer/live order; @@ -37,7 +46,8 @@ void submit(T item) { } } - void close() { + @Override + public void close() { lock.lock(); try { if (closed) { @@ -48,6 +58,7 @@ void close() { } finally { lock.unlock(); } + liveExecutor.shutdown(); } @Override @@ -55,9 +66,10 @@ public void subscribe(Flow.Subscriber downstream) { final List snapshot; final boolean wasClosed; AtomicBoolean cancelled = new AtomicBoolean(false); + AtomicBoolean replayDone = new AtomicBoolean(false); + Deque pending = new ArrayDeque<>(); + Object pendingLock = new Object(); - // Hold the publisher lock while snapshotting AND attaching to live so - // no submit() can interleave between the two and produce a gap. lock.lock(); try { snapshot = new ArrayList<>(buffer); @@ -72,9 +84,18 @@ public void onSubscribe(Flow.Subscription s) { @Override public void onNext(T item) { - if (!cancelled.get()) { - downstream.onNext(item); + if (cancelled.get()) { + return; + } + if (!replayDone.get()) { + synchronized (pendingLock) { + if (!replayDone.get()) { + pending.addLast(item); + return; + } + } } + downstream.onNext(item); } @Override @@ -115,6 +136,17 @@ public void cancel() { } downstream.onNext(item); } + // Drain any items the live forwarder buffered during replay, then flip the + // flag so subsequent live deliveries flow straight through. + synchronized (pendingLock) { + while (!pending.isEmpty()) { + if (cancelled.get()) { + return; + } + downstream.onNext(pending.pollFirst()); + } + replayDone.set(true); + } if (wasClosed) { downstream.onComplete(); } diff --git a/arcp-client/src/main/java/dev/arcp/client/Session.java b/arcp-client/src/main/java/dev/arcp/client/Session.java index bd6ca73..8bf1336 100644 --- a/arcp-client/src/main/java/dev/arcp/client/Session.java +++ b/arcp-client/src/main/java/dev/arcp/client/Session.java @@ -8,9 +8,19 @@ import java.util.Set; import org.jspecify.annotations.Nullable; +/** + * Snapshot of a successfully handshaken ARCP session. Collection components are defensively copied + * into immutable views so callers cannot mutate the session after construction. + */ public record Session( SessionId sessionId, Set negotiatedFeatures, @Nullable String resumeToken, @Nullable Duration heartbeatInterval, - List availableAgents) {} + List availableAgents) { + + public Session { + negotiatedFeatures = Set.copyOf(negotiatedFeatures); + availableAgents = List.copyOf(availableAgents); + } +} diff --git a/arcp-client/src/main/java/dev/arcp/client/WebSocketTransport.java b/arcp-client/src/main/java/dev/arcp/client/WebSocketTransport.java index d44af67..82345cb 100644 --- a/arcp-client/src/main/java/dev/arcp/client/WebSocketTransport.java +++ b/arcp-client/src/main/java/dev/arcp/client/WebSocketTransport.java @@ -14,10 +14,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,15 +33,23 @@ public final class WebSocketTransport implements Transport { private static final Logger log = LoggerFactory.getLogger(WebSocketTransport.class); - private final WebSocket socket; + private volatile @Nullable WebSocket socket; private final ObjectMapper mapper; private final SubmissionPublisher inbound; + private final ExecutorService inboundExecutor; private final StringBuilder partial = new StringBuilder(); + private final ReentrantLock writeLock = new ReentrantLock(); + private final @Nullable HttpClient ownedHttpClient; - private WebSocketTransport(WebSocket socket, ObjectMapper mapper) { - this.socket = socket; + private WebSocketTransport(ObjectMapper mapper, @Nullable HttpClient ownedHttpClient) { this.mapper = mapper; - this.inbound = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.ownedHttpClient = ownedHttpClient; + this.inboundExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.inbound = new SubmissionPublisher<>(inboundExecutor, 1024); + } + + void attachSocket(WebSocket socket) { + this.socket = socket; } /** Open a WebSocket connection to {@code uri} and return a connected transport. */ @@ -51,11 +61,11 @@ public static WebSocketTransport connect( URI uri, Map headers, ObjectMapper mapper, Duration timeout) throws InterruptedException { HttpClient httpClient = HttpClient.newHttpClient(); + WebSocketTransport transport = new WebSocketTransport(mapper, httpClient); WebSocket.Builder builder = httpClient.newWebSocketBuilder(); for (var entry : headers.entrySet()) { builder.header(entry.getKey(), entry.getValue()); } - var futureSocket = new java.util.concurrent.atomic.AtomicReference(); CompletableFuture stage = builder.buildAsync( uri, @@ -68,10 +78,7 @@ public void onOpen(WebSocket webSocket) { @Override public @Nullable CompletionStage onText( WebSocket webSocket, CharSequence data, boolean last) { - WebSocketTransport t = futureSocket.get(); - if (t != null) { - t.handleText(data, last); - } + transport.handleText(data, last); webSocket.request(1); return null; } @@ -79,19 +86,13 @@ public void onOpen(WebSocket webSocket) { @Override public @Nullable CompletionStage onClose( WebSocket webSocket, int statusCode, String reason) { - WebSocketTransport t = futureSocket.get(); - if (t != null) { - t.inbound.close(); - } + transport.inbound.close(); return null; } @Override public void onError(WebSocket webSocket, Throwable error) { - WebSocketTransport t = futureSocket.get(); - if (t != null) { - t.inbound.closeExceptionally(error); - } + transport.inbound.closeExceptionally(error); } }); WebSocket ws; @@ -100,12 +101,21 @@ public void onError(WebSocket webSocket, Throwable error) { } catch (java.util.concurrent.ExecutionException e) { Throwable cause = e.getCause() instanceof CompletionException ce ? ce.getCause() : e.getCause(); + try { + httpClient.close(); + } catch (RuntimeException ignored) { + // best-effort + } throw new IllegalStateException("WebSocket connect failed: " + cause, cause); } catch (java.util.concurrent.TimeoutException e) { + try { + httpClient.close(); + } catch (RuntimeException ignored) { + // best-effort + } throw new IllegalStateException("WebSocket connect timed out", e); } - WebSocketTransport transport = new WebSocketTransport(ws, mapper); - futureSocket.set(transport); + transport.attachSocket(ws); return transport; } @@ -126,9 +136,14 @@ private void handleText(CharSequence data, boolean last) { @Override public void send(Envelope envelope) { + WebSocket ws = socket; + if (ws == null) { + throw new IllegalStateException("WebSocketTransport.send called before socket attached"); + } + writeLock.lock(); try { String json = mapper.writeValueAsString(envelope); - socket.sendText(json, true).get(5, TimeUnit.SECONDS); + ws.sendText(json, true).get(5, TimeUnit.SECONDS); } catch (IOException e) { throw new UncheckedIOException(e); } catch (InterruptedException e) { @@ -136,6 +151,8 @@ public void send(Envelope envelope) { throw new IllegalStateException("interrupted while sending", e); } catch (java.util.concurrent.ExecutionException | java.util.concurrent.TimeoutException e) { throw new IllegalStateException("send failed", e); + } finally { + writeLock.unlock(); } } @@ -146,14 +163,25 @@ public Flow.Publisher incoming() { @Override public void close() { - try { - socket.sendClose(WebSocket.NORMAL_CLOSURE, "bye").get(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (java.util.concurrent.ExecutionException - | java.util.concurrent.TimeoutException ignored) { - // best-effort close + WebSocket ws = socket; + if (ws != null) { + try { + ws.sendClose(WebSocket.NORMAL_CLOSURE, "bye").get(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (java.util.concurrent.ExecutionException + | java.util.concurrent.TimeoutException ignored) { + // best-effort close + } } inbound.close(); + inboundExecutor.shutdown(); + if (ownedHttpClient != null) { + try { + ownedHttpClient.close(); + } catch (RuntimeException ignored) { + // best-effort + } + } } } diff --git a/arcp-client/src/test/java/dev/arcp/client/IdempotencyTest.java b/arcp-client/src/test/java/dev/arcp/client/IdempotencyTest.java index 7502d62..0caae54 100644 --- a/arcp-client/src/test/java/dev/arcp/client/IdempotencyTest.java +++ b/arcp-client/src/test/java/dev/arcp/client/IdempotencyTest.java @@ -66,8 +66,8 @@ void conflictingPayloadYieldsDuplicateKey() throws Exception { } private static ArcpClient paired(ArcpRuntime runtime) { - MemoryTransport[] pair = MemoryTransport.pair(); - runtime.accept(pair[0]); - return ArcpClient.builder(pair[1]).build(); + MemoryTransport.Pair pair = MemoryTransport.pair(); + runtime.accept(pair.runtime()); + return ArcpClient.builder(pair.client()).build(); } } diff --git a/arcp-client/src/test/java/dev/arcp/client/RuntimeErrorMappingTest.java b/arcp-client/src/test/java/dev/arcp/client/RuntimeErrorMappingTest.java index d205ee0..c902bed 100644 --- a/arcp-client/src/test/java/dev/arcp/client/RuntimeErrorMappingTest.java +++ b/arcp-client/src/test/java/dev/arcp/client/RuntimeErrorMappingTest.java @@ -17,7 +17,7 @@ class RuntimeErrorMappingTest { @Test void permissionDeniedFromAgentAuthorizationReachesClient() throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -28,9 +28,9 @@ void permissionDeniedFromAgentAuthorizationReachesClient() throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = client.submit( diff --git a/arcp-client/src/test/java/dev/arcp/client/SmokeRoundTripTest.java b/arcp-client/src/test/java/dev/arcp/client/SmokeRoundTripTest.java index 8d09713..2d26ae1 100644 --- a/arcp-client/src/test/java/dev/arcp/client/SmokeRoundTripTest.java +++ b/arcp-client/src/test/java/dev/arcp/client/SmokeRoundTripTest.java @@ -22,9 +22,9 @@ class SmokeRoundTripTest { @Test void inProcessJobRoundTrip() throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); - MemoryTransport runtimeSide = pair[0]; - MemoryTransport clientSide = pair[1]; + MemoryTransport.Pair pair = MemoryTransport.pair(); + MemoryTransport runtimeSide = pair.runtime(); + MemoryTransport clientSide = pair.client(); ArcpRuntime runtime = ArcpRuntime.builder() diff --git a/arcp-client/src/test/java/dev/arcp/client/SubscribeReplayTest.java b/arcp-client/src/test/java/dev/arcp/client/SubscribeReplayTest.java index e790efa..8df12e7 100644 --- a/arcp-client/src/test/java/dev/arcp/client/SubscribeReplayTest.java +++ b/arcp-client/src/test/java/dev/arcp/client/SubscribeReplayTest.java @@ -41,9 +41,9 @@ void historyReplaysBufferedEvents() throws Exception { }) .build(); - MemoryTransport[] pair = MemoryTransport.pair(); - runtime.accept(pair[0]); - try (ArcpClient submitter = ArcpClient.builder(pair[1]).build()) { + MemoryTransport.Pair pair = MemoryTransport.pair(); + runtime.accept(pair.runtime()); + try (ArcpClient submitter = ArcpClient.builder(pair.client()).build()) { submitter.connect(Duration.ofSeconds(5)); JobHandle handle = submitter.submit( @@ -87,8 +87,8 @@ public void onComplete() {} @Test void secondSessionCanReplayCompletedJobHistory() throws Exception { - MemoryTransport[] submitPair = MemoryTransport.pair(); - MemoryTransport[] replayPair = MemoryTransport.pair(); + MemoryTransport.Pair submitPair = MemoryTransport.pair(); + MemoryTransport.Pair replayPair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -100,11 +100,11 @@ void secondSessionCanReplayCompletedJobHistory() throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(submitPair[0]); - runtime.accept(replayPair[0]); + runtime.accept(submitPair.runtime()); + runtime.accept(replayPair.runtime()); JobHandle handle; - try (ArcpClient submitter = ArcpClient.builder(submitPair[1]).bearer("shared").build()) { + try (ArcpClient submitter = ArcpClient.builder(submitPair.client()).bearer("shared").build()) { submitter.connect(Duration.ofSeconds(5)); handle = submitter.submit( @@ -113,7 +113,7 @@ void secondSessionCanReplayCompletedJobHistory() throws Exception { } CopyOnWriteArrayList replayed = new CopyOnWriteArrayList<>(); - try (ArcpClient replayer = ArcpClient.builder(replayPair[1]).bearer("shared").build()) { + try (ArcpClient replayer = ArcpClient.builder(replayPair.client()).bearer("shared").build()) { replayer.connect(Duration.ofSeconds(5)); replayer .subscribe(handle.jobId(), SubscribeOptions.withHistory(0L)) diff --git a/arcp-client/src/test/java/dev/arcp/client/credentials/CredentialProvisioningIntegrationTest.java b/arcp-client/src/test/java/dev/arcp/client/credentials/CredentialProvisioningIntegrationTest.java index e664407..bf6e9af 100644 --- a/arcp-client/src/test/java/dev/arcp/client/credentials/CredentialProvisioningIntegrationTest.java +++ b/arcp-client/src/test/java/dev/arcp/client/credentials/CredentialProvisioningIntegrationTest.java @@ -38,10 +38,10 @@ class CredentialProvisioningIntegrationTest { void credentialsSurfaceAndRevokeOnSuccess() throws Exception { CountingProvisioner provisioner = new CountingProvisioner(); InMemoryCredentialRevocationStore store = new InMemoryCredentialRevocationStore(); - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = runtime( - pair[0], + pair.runtime(), provisioner, store, "agent", @@ -53,7 +53,7 @@ void credentialsSurfaceAndRevokeOnSuccess() throws Exception { }); try (runtime; - ArcpClient client = ArcpClient.builder(pair[1]).build()) { + ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = client.submit( @@ -92,10 +92,10 @@ void credentialsRevokeOnCancelAndError() throws Exception { void credentialsRevokeOnLeaseTimeout() throws Exception { CountingProvisioner provisioner = new CountingProvisioner(); InMemoryCredentialRevocationStore store = new InMemoryCredentialRevocationStore(); - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = runtime( - pair[0], + pair.runtime(), provisioner, store, "agent", @@ -106,7 +106,7 @@ void credentialsRevokeOnLeaseTimeout() throws Exception { }); try (runtime; - ArcpClient client = ArcpClient.builder(pair[1]).build()) { + ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = client.submit( @@ -146,17 +146,17 @@ public CompletableFuture revoke(CredentialId id) { return CompletableFuture.completedFuture(null); } }; - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .credentialProvisioner(provisioner) .credentialRevocationStore(new InMemoryCredentialRevocationStore()) .agent("agent", "1.0.0", (input, ctx) -> JobOutcome.Success.inline(input.payload())) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); try (runtime; - ArcpClient client = ArcpClient.builder(pair[1]).build()) { + ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); assertThatThrownBy( () -> @@ -178,11 +178,11 @@ private static void assertTerminalRevokes(dev.arcp.runtime.agent.Agent agent, bo throws Exception { CountingProvisioner provisioner = new CountingProvisioner(); InMemoryCredentialRevocationStore store = new InMemoryCredentialRevocationStore(); - MemoryTransport[] pair = MemoryTransport.pair(); - ArcpRuntime runtime = runtime(pair[0], provisioner, store, "agent", "1.0.0", agent); + MemoryTransport.Pair pair = MemoryTransport.pair(); + ArcpRuntime runtime = runtime(pair.runtime(), provisioner, store, "agent", "1.0.0", agent); try (runtime; - ArcpClient client = ArcpClient.builder(pair[1]).build()) { + ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = client.submit( diff --git a/arcp-core/src/main/java/dev/arcp/core/auth/BearerVerifier.java b/arcp-core/src/main/java/dev/arcp/core/auth/BearerVerifier.java index 897837a..e644f39 100644 --- a/arcp-core/src/main/java/dev/arcp/core/auth/BearerVerifier.java +++ b/arcp-core/src/main/java/dev/arcp/core/auth/BearerVerifier.java @@ -1,29 +1,55 @@ package dev.arcp.core.auth; import dev.arcp.core.error.UnauthenticatedException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HexFormat; /** SPI for verifying §6.1 bearer tokens at the handshake seam. */ @FunctionalInterface public interface BearerVerifier { Principal verify(String token) throws UnauthenticatedException; - /** Static-token verifier; suitable for development and tests. */ + /** + * Static-token verifier that compares the supplied bearer token to {@code expected} in + * constant-time using {@link MessageDigest#isEqual(byte[], byte[])}. Suitable for production use + * with static credentials. + */ static BearerVerifier staticToken(String expected, Principal principal) { + byte[] expectedBytes = expected.getBytes(StandardCharsets.UTF_8); return token -> { - if (!expected.equals(token)) { + if (token == null) { + throw new UnauthenticatedException("invalid bearer token"); + } + byte[] tokenBytes = token.getBytes(StandardCharsets.UTF_8); + if (tokenBytes.length != expectedBytes.length + || !MessageDigest.isEqual(expectedBytes, tokenBytes)) { throw new UnauthenticatedException("invalid bearer token"); } return principal; }; } - /** Accept any non-empty token, returning a principal derived from its hash. */ + /** + * Accept any non-empty token, returning a principal derived from a SHA-256 digest of the token + * bytes (first 16 bytes hex-encoded). Avoids the principal-collision risk of using {@code + * String#hashCode}. + */ static BearerVerifier acceptAny() { return token -> { if (token == null || token.isEmpty()) { throw new UnauthenticatedException("empty bearer token"); } - return new Principal("bearer:" + Integer.toHexString(token.hashCode())); + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + byte[] digest = md.digest(token.getBytes(StandardCharsets.UTF_8)); + byte[] head = new byte[16]; + System.arraycopy(digest, 0, head, 0, 16); + return new Principal("bearer:" + HexFormat.of().formatHex(head)); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 unavailable", e); + } }; } } diff --git a/arcp-core/src/main/java/dev/arcp/core/capabilities/Feature.java b/arcp-core/src/main/java/dev/arcp/core/capabilities/Feature.java index 74dbc71..4891662 100644 --- a/arcp-core/src/main/java/dev/arcp/core/capabilities/Feature.java +++ b/arcp-core/src/main/java/dev/arcp/core/capabilities/Feature.java @@ -2,9 +2,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; -import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; +/** + * Enumeration of all optional ARCP protocol features. Each constant carries the canonical {@code + * wire} string used in capability negotiation; unknown wire strings are surfaced through {@link + * #fromWire(String)} as {@link Optional#empty()}. + */ public enum Feature { HEARTBEAT("heartbeat"), ACK("ack"), @@ -29,8 +36,18 @@ public String wire() { return wire; } + private static final Map BY_WIRE; + + static { + Map m = new HashMap<>(); + for (Feature f : values()) { + m.put(f.wire, f); + } + BY_WIRE = Collections.unmodifiableMap(m); + } + @JsonCreator public static Optional fromWire(String wire) { - return Arrays.stream(values()).filter(f -> f.wire.equals(wire)).findFirst(); + return Optional.ofNullable(BY_WIRE.get(wire)); } } diff --git a/arcp-core/src/main/java/dev/arcp/core/events/EventBody.java b/arcp-core/src/main/java/dev/arcp/core/events/EventBody.java index f6822de..62ea5e2 100644 --- a/arcp-core/src/main/java/dev/arcp/core/events/EventBody.java +++ b/arcp-core/src/main/java/dev/arcp/core/events/EventBody.java @@ -41,11 +41,22 @@ public String wire() { return wire; } + private static final java.util.Map BY_WIRE; + + static { + java.util.Map m = new java.util.HashMap<>(); + for (Kind k : values()) { + m.put(k.wire, k); + } + BY_WIRE = java.util.Collections.unmodifiableMap(m); + } + public static Kind fromWire(String wire) { - return java.util.Arrays.stream(values()) - .filter(k -> k.wire.equals(wire)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("unknown event kind: " + wire)); + Kind k = BY_WIRE.get(wire); + if (k == null) { + throw new IllegalArgumentException("unknown event kind: " + wire); + } + return k; } } } diff --git a/arcp-core/src/main/java/dev/arcp/core/messages/Message.java b/arcp-core/src/main/java/dev/arcp/core/messages/Message.java index 4d4fb39..51d1e94 100644 --- a/arcp-core/src/main/java/dev/arcp/core/messages/Message.java +++ b/arcp-core/src/main/java/dev/arcp/core/messages/Message.java @@ -54,11 +54,22 @@ public String wire() { return wire; } + private static final java.util.Map BY_WIRE; + + static { + java.util.Map m = new java.util.HashMap<>(); + for (Type t : values()) { + m.put(t.wire, t); + } + BY_WIRE = java.util.Collections.unmodifiableMap(m); + } + public static Type fromWire(String wire) { - return java.util.Arrays.stream(values()) - .filter(t -> t.wire.equals(wire)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("unknown message type: " + wire)); + Type t = BY_WIRE.get(wire); + if (t == null) { + throw new IllegalArgumentException("unknown message type: " + wire); + } + return t; } } } diff --git a/arcp-core/src/main/java/dev/arcp/core/transport/MemoryTransport.java b/arcp-core/src/main/java/dev/arcp/core/transport/MemoryTransport.java index 5f0c449..44461e9 100644 --- a/arcp-core/src/main/java/dev/arcp/core/transport/MemoryTransport.java +++ b/arcp-core/src/main/java/dev/arcp/core/transport/MemoryTransport.java @@ -2,6 +2,7 @@ import dev.arcp.core.wire.Envelope; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; @@ -12,24 +13,42 @@ */ public final class MemoryTransport implements Transport { + /** + * Typed pair of {@link MemoryTransport} endpoints returned by {@link #pair()}. The {@code + * runtime} endpoint is intended to be attached to an {@code ArcpRuntime} and the {@code client} + * endpoint to an {@code ArcpClient}. Either component is functionally interchangeable, but the + * names exist to remove the index-juggling that array returns required. + */ + public record Pair(MemoryTransport runtime, MemoryTransport client) {} + private final SubmissionPublisher inbound; + private final ExecutorService executor; private volatile MemoryTransport peer; private volatile boolean closed; - private MemoryTransport(SubmissionPublisher inbound) { + private MemoryTransport(SubmissionPublisher inbound, ExecutorService executor) { this.inbound = inbound; + this.executor = executor; } - public static MemoryTransport[] pair() { + /** + * Construct a fresh pair of cross-wired {@link MemoryTransport} endpoints. Each {@code send} on + * either endpoint delivers the envelope to the other's {@link #incoming()} publisher. + * + * @return a {@link Pair} of newly constructed endpoints. + */ + public static Pair pair() { // Per-transport publishers using virtual-thread executors keep the // dispatch hop off the caller's thread and out of any platform pool. - var aPub = new SubmissionPublisher(Executors.newVirtualThreadPerTaskExecutor(), 1024); - var bPub = new SubmissionPublisher(Executors.newVirtualThreadPerTaskExecutor(), 1024); - MemoryTransport a = new MemoryTransport(aPub); - MemoryTransport b = new MemoryTransport(bPub); + ExecutorService aExec = Executors.newVirtualThreadPerTaskExecutor(); + ExecutorService bExec = Executors.newVirtualThreadPerTaskExecutor(); + var aPub = new SubmissionPublisher(aExec, 1024); + var bPub = new SubmissionPublisher(bExec, 1024); + MemoryTransport a = new MemoryTransport(aPub, aExec); + MemoryTransport b = new MemoryTransport(bPub, bExec); a.peer = b; b.peer = a; - return new MemoryTransport[] {a, b}; + return new Pair(a, b); } @Override @@ -57,5 +76,6 @@ public void close() { } closed = true; inbound.close(); + executor.shutdown(); } } diff --git a/arcp-core/src/main/java/dev/arcp/core/transport/StdioTransport.java b/arcp-core/src/main/java/dev/arcp/core/transport/StdioTransport.java index 4a0c886..52d0eca 100644 --- a/arcp-core/src/main/java/dev/arcp/core/transport/StdioTransport.java +++ b/arcp-core/src/main/java/dev/arcp/core/transport/StdioTransport.java @@ -13,6 +13,7 @@ import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; @@ -49,6 +50,7 @@ public final class StdioTransport implements Transport { private final ReentrantLock writeLock = new ReentrantLock(); private final BufferedReader reader; private final SubmissionPublisher inbound; + private final ExecutorService inboundExecutor; private final Thread readerThread; private volatile boolean closed; @@ -64,7 +66,8 @@ public StdioTransport(InputStream in, OutputStream out, ObjectMapper mapper) { this.reader = new BufferedReader( new InputStreamReader(Objects.requireNonNull(in, "in"), StandardCharsets.UTF_8)); - this.inbound = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.inboundExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.inbound = new SubmissionPublisher<>(inboundExecutor, 1024); this.readerThread = Thread.ofVirtual().name("arcp-stdio-reader").unstarted(this::readLoop); } @@ -137,6 +140,7 @@ public void close() { // best-effort close } inbound.close(); + inboundExecutor.shutdown(); readerThread.interrupt(); } } diff --git a/arcp-core/src/main/java/dev/arcp/core/wire/ArcpMapper.java b/arcp-core/src/main/java/dev/arcp/core/wire/ArcpMapper.java index 9795ea0..b962b51 100644 --- a/arcp-core/src/main/java/dev/arcp/core/wire/ArcpMapper.java +++ b/arcp-core/src/main/java/dev/arcp/core/wire/ArcpMapper.java @@ -36,9 +36,8 @@ public static ObjectMapper create() { /** * Shared mapper for hot-path use. * - * Treat this instance as read-only. Reconfiguring the shared mapper can - * affect unrelated callers because the same ObjectMapper is reused across - * the SDK. + *

Treat this instance as read-only. Reconfiguring the shared mapper can affect unrelated + * callers because the same ObjectMapper is reused across the SDK. */ public static ObjectMapper shared() { return SHARED; diff --git a/arcp-core/src/test/java/dev/arcp/core/auth/BearerVerifierTest.java b/arcp-core/src/test/java/dev/arcp/core/auth/BearerVerifierTest.java new file mode 100644 index 0000000..92c71de --- /dev/null +++ b/arcp-core/src/test/java/dev/arcp/core/auth/BearerVerifierTest.java @@ -0,0 +1,35 @@ +package dev.arcp.core.auth; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.arcp.core.error.UnauthenticatedException; +import org.junit.jupiter.api.Test; + +class BearerVerifierTest { + + @Test + void staticToken_acceptsMatching() { + BearerVerifier v = BearerVerifier.staticToken("sekret", new Principal("p")); + assertDoesNotThrow(() -> v.verify("sekret")); + } + + @Test + void staticToken_rejectsNullAndMismatch() { + BearerVerifier v = BearerVerifier.staticToken("sekret", new Principal("p")); + assertThrows(UnauthenticatedException.class, () -> v.verify(null)); + assertThrows(UnauthenticatedException.class, () -> v.verify("wrong")); + assertThrows(UnauthenticatedException.class, () -> v.verify("sekrett")); + } + + @Test + void acceptAny_distinguishesHashCodeCollidingTokens() throws Exception { + // "Aa" and "BB" share the same String.hashCode() (2112). With a hashCode-derived + // principal id, both would authenticate as the same identity. The fix uses SHA-256. + BearerVerifier v = BearerVerifier.acceptAny(); + Principal a = v.verify("Aa"); + Principal b = v.verify("BB"); + assertNotEquals(a.id(), b.id()); + } +} diff --git a/arcp-core/src/test/java/dev/arcp/core/capabilities/CapabilitiesTest.java b/arcp-core/src/test/java/dev/arcp/core/capabilities/CapabilitiesTest.java new file mode 100644 index 0000000..656dec8 --- /dev/null +++ b/arcp-core/src/test/java/dev/arcp/core/capabilities/CapabilitiesTest.java @@ -0,0 +1,26 @@ +package dev.arcp.core.capabilities; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.EnumSet; +import java.util.Set; +import org.junit.jupiter.api.Test; + +class CapabilitiesTest { + + @Test + void emptyFeatureSet_doesNotThrow() { + Capabilities c = Capabilities.of(Set.of()); + assertTrue(c.features().isEmpty()); + assertTrue(c.featuresWire().isEmpty()); + } + + @Test + void compactConstructorCopiesCollections() { + EnumSet mutable = EnumSet.of(Feature.HEARTBEAT); + Capabilities c = Capabilities.of(mutable); + mutable.add(Feature.ACK); + assertEquals(1, c.features().size()); + } +} diff --git a/arcp-middleware-jakarta/src/main/java/dev/arcp/middleware/jakarta/JakartaWebSocketTransport.java b/arcp-middleware-jakarta/src/main/java/dev/arcp/middleware/jakarta/JakartaWebSocketTransport.java index ef61171..cf9ea5f 100644 --- a/arcp-middleware-jakarta/src/main/java/dev/arcp/middleware/jakarta/JakartaWebSocketTransport.java +++ b/arcp-middleware-jakarta/src/main/java/dev/arcp/middleware/jakarta/JakartaWebSocketTransport.java @@ -8,9 +8,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.locks.ReentrantLock; /** Bridges a {@link jakarta.websocket.Session} to the ARCP {@link Transport} SPI. */ final class JakartaWebSocketTransport implements Transport { @@ -18,11 +20,14 @@ final class JakartaWebSocketTransport implements Transport { private final Session session; private final ObjectMapper mapper; private final SubmissionPublisher inbound; + private final ExecutorService inboundExecutor; + private final ReentrantLock writeLock = new ReentrantLock(); JakartaWebSocketTransport(Session session, ObjectMapper mapper) { this.session = Objects.requireNonNull(session, "session"); this.mapper = mapper != null ? mapper : ArcpMapper.shared(); - this.inbound = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.inboundExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.inbound = new SubmissionPublisher<>(inboundExecutor, 1024); } void deliver(String frame) { @@ -43,11 +48,14 @@ void failInbound(Throwable t) { @Override public void send(Envelope envelope) { + writeLock.lock(); try { String json = mapper.writeValueAsString(envelope); session.getBasicRemote().sendText(json); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + writeLock.unlock(); } } @@ -64,5 +72,6 @@ public void close() { // best-effort close } inbound.close(); + inboundExecutor.shutdown(); } } diff --git a/arcp-middleware-spring-boot/src/main/java/dev/arcp/middleware/spring/SpringWebSocketTransport.java b/arcp-middleware-spring-boot/src/main/java/dev/arcp/middleware/spring/SpringWebSocketTransport.java index 692f248..a9c1929 100644 --- a/arcp-middleware-spring-boot/src/main/java/dev/arcp/middleware/spring/SpringWebSocketTransport.java +++ b/arcp-middleware-spring-boot/src/main/java/dev/arcp/middleware/spring/SpringWebSocketTransport.java @@ -6,9 +6,11 @@ import dev.arcp.core.wire.Envelope; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; @@ -21,11 +23,14 @@ final class SpringWebSocketTransport implements Transport { private final WebSocketSession session; private final ObjectMapper mapper; private final SubmissionPublisher inbound; + private final ExecutorService inboundExecutor; + private final ReentrantLock writeLock = new ReentrantLock(); SpringWebSocketTransport(WebSocketSession session, ObjectMapper mapper) { this.session = session; this.mapper = mapper != null ? mapper : ArcpMapper.shared(); - this.inbound = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.inboundExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.inbound = new SubmissionPublisher<>(inboundExecutor, 1024); } void deliver(String frame) { @@ -46,10 +51,13 @@ void failInbound(Throwable t) { @Override public void send(Envelope envelope) { + writeLock.lock(); try { session.sendMessage(new TextMessage(mapper.writeValueAsString(envelope))); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + writeLock.unlock(); } } @@ -66,5 +74,6 @@ public void close() { // best-effort close } inbound.close(); + inboundExecutor.shutdown(); } } diff --git a/arcp-middleware-vertx/src/main/java/dev/arcp/middleware/vertx/VertxWebSocketTransport.java b/arcp-middleware-vertx/src/main/java/dev/arcp/middleware/vertx/VertxWebSocketTransport.java index 2dc1c06..a9b8709 100644 --- a/arcp-middleware-vertx/src/main/java/dev/arcp/middleware/vertx/VertxWebSocketTransport.java +++ b/arcp-middleware-vertx/src/main/java/dev/arcp/middleware/vertx/VertxWebSocketTransport.java @@ -8,9 +8,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.locks.ReentrantLock; /** Bridges a Vert.x {@link ServerWebSocket} to the ARCP {@link Transport} SPI. */ final class VertxWebSocketTransport implements Transport { @@ -18,11 +20,14 @@ final class VertxWebSocketTransport implements Transport { private final ServerWebSocket socket; private final ObjectMapper mapper; private final SubmissionPublisher inbound; + private final ExecutorService inboundExecutor; + private final ReentrantLock writeLock = new ReentrantLock(); VertxWebSocketTransport(ServerWebSocket socket, ObjectMapper mapper) { this.socket = Objects.requireNonNull(socket, "socket"); this.mapper = mapper != null ? mapper : ArcpMapper.shared(); - this.inbound = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.inboundExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.inbound = new SubmissionPublisher<>(inboundExecutor, 1024); } void deliver(String frame) { @@ -43,11 +48,14 @@ void failInbound(Throwable t) { @Override public void send(Envelope envelope) { + writeLock.lock(); try { String json = mapper.writeValueAsString(envelope); socket.writeTextMessage(json); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + writeLock.unlock(); } } @@ -60,5 +68,6 @@ public Flow.Publisher incoming() { public void close() { socket.close(); inbound.close(); + inboundExecutor.shutdown(); } } diff --git a/arcp-otel/src/main/java/dev/arcp/otel/ArcpOtel.java b/arcp-otel/src/main/java/dev/arcp/otel/ArcpOtel.java index 984adfc..e0381a7 100644 --- a/arcp-otel/src/main/java/dev/arcp/otel/ArcpOtel.java +++ b/arcp-otel/src/main/java/dev/arcp/otel/ArcpOtel.java @@ -1,5 +1,6 @@ package dev.arcp.otel; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import dev.arcp.core.transport.Transport; import dev.arcp.core.wire.Envelope; @@ -176,11 +177,15 @@ private Envelope injectTraceContext(Envelope envelope) { return envelope; } ObjectNode payload = envelope.payload().deepCopy(); - ObjectNode ext = (ObjectNode) payload.path("extensions"); - if (ext.isMissingNode()) { - ext = payload.putObject("extensions"); - } else if (!payload.has("extensions")) { + JsonNode existing = payload.path("extensions"); + ObjectNode ext; + if (existing.isMissingNode()) { ext = payload.putObject("extensions"); + } else if (existing.isObject()) { + ext = (ObjectNode) existing; + } else { + // Non-object "extensions" — cannot safely inject; leave envelope unchanged. + return envelope; } ObjectNode otelNode = ext.putObject(EXTENSION_NAME); for (var e : carrier.entrySet()) { @@ -198,11 +203,16 @@ private Envelope injectTraceContext(Envelope envelope) { } private Context extractTraceContext(Envelope envelope) { - ObjectNode ext = (ObjectNode) envelope.payload().path("extensions"); - if (ext.isMissingNode() || !ext.has(EXTENSION_NAME)) { + JsonNode extNode = envelope.payload().path("extensions"); + if (!extNode.isObject()) { + return Context.current(); + } + ObjectNode ext = (ObjectNode) extNode; + JsonNode otelRaw = ext.get(EXTENSION_NAME); + if (otelRaw == null || !otelRaw.isObject()) { return Context.current(); } - ObjectNode otelNode = (ObjectNode) ext.get(EXTENSION_NAME); + ObjectNode otelNode = (ObjectNode) otelRaw; Map carrier = new HashMap<>(); otelNode.fieldNames().forEachRemaining(k -> carrier.put(k, otelNode.get(k).asText())); return propagator.extract(Context.current(), carrier, TextMapGetterImpl.INSTANCE); diff --git a/arcp-otel/src/test/java/dev/arcp/otel/ArcpOtelTest.java b/arcp-otel/src/test/java/dev/arcp/otel/ArcpOtelTest.java index 3dc5e93..043407e 100644 --- a/arcp-otel/src/test/java/dev/arcp/otel/ArcpOtelTest.java +++ b/arcp-otel/src/test/java/dev/arcp/otel/ArcpOtelTest.java @@ -29,9 +29,9 @@ void wrappingEmitsSendAndReceiveSpans() throws Exception { OpenTelemetry otel = OpenTelemetrySdk.builder().setTracerProvider(provider).build(); Tracer tracer = otel.getTracer("arcp-otel-test"); - MemoryTransport[] pair = MemoryTransport.pair(); - Transport sender = ArcpOtel.withTracing(pair[0], tracer); - Transport receiver = ArcpOtel.withTracing(pair[1], tracer); + MemoryTransport.Pair pair = MemoryTransport.pair(); + Transport sender = ArcpOtel.withTracing(pair.runtime(), tracer); + Transport receiver = ArcpOtel.withTracing(pair.client(), tracer); CopyOnWriteArrayList received = new CopyOnWriteArrayList<>(); receiver diff --git a/arcp-runtime-jetty/src/main/java/dev/arcp/runtime/jetty/WebSocketJsonTransport.java b/arcp-runtime-jetty/src/main/java/dev/arcp/runtime/jetty/WebSocketJsonTransport.java index 15faf14..6023732 100644 --- a/arcp-runtime-jetty/src/main/java/dev/arcp/runtime/jetty/WebSocketJsonTransport.java +++ b/arcp-runtime-jetty/src/main/java/dev/arcp/runtime/jetty/WebSocketJsonTransport.java @@ -8,9 +8,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,11 +27,14 @@ final class WebSocketJsonTransport implements Transport { private final Session session; private final ObjectMapper mapper; private final SubmissionPublisher inbound; + private final ExecutorService inboundExecutor; + private final ReentrantLock writeLock = new ReentrantLock(); WebSocketJsonTransport(Session session, ObjectMapper mapper) { this.session = Objects.requireNonNull(session, "session"); this.mapper = mapper != null ? mapper : ArcpMapper.shared(); - this.inbound = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024); + this.inboundExecutor = Executors.newVirtualThreadPerTaskExecutor(); + this.inbound = new SubmissionPublisher<>(inboundExecutor, 1024); } void deliver(String frame) { @@ -51,11 +56,14 @@ void failInbound(Throwable t) { @Override public void send(Envelope envelope) { + writeLock.lock(); try { String json = mapper.writeValueAsString(envelope); session.getBasicRemote().sendText(json); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + writeLock.unlock(); } } @@ -72,5 +80,6 @@ public void close() { // best-effort close } inbound.close(); + inboundExecutor.shutdown(); } } diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/ArcpRuntime.java b/arcp-runtime/src/main/java/dev/arcp/runtime/ArcpRuntime.java index 366cc85..e78700f 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/ArcpRuntime.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/ArcpRuntime.java @@ -42,7 +42,9 @@ public final class ArcpRuntime implements AutoCloseable { private final int resumeBufferCapacity; private final Clock clock; private final ExecutorService workerPool; + private final boolean ownedWorkerPool; private final ScheduledExecutorService scheduler; + private final boolean ownedScheduler; private final String runtimeName; private final String runtimeVersion; private final IdempotencyStore idempotency; @@ -68,25 +70,31 @@ private ArcpRuntime(Builder b) { this.resumeWindowSec = b.resumeWindowSec; this.resumeBufferCapacity = b.resumeBufferCapacity; this.clock = b.clock != null ? b.clock : Clock.systemUTC(); - this.workerPool = - b.workerPool != null ? b.workerPool : Executors.newVirtualThreadPerTaskExecutor(); - this.scheduler = - b.scheduler != null - ? b.scheduler - : Executors.newScheduledThreadPool( - 1, - r -> - Thread.ofPlatform() - .name("arcp-runtime-scheduler", 0) - .daemon(true) - .unstarted(r)); + if (b.workerPool != null) { + this.workerPool = b.workerPool; + this.ownedWorkerPool = false; + } else { + this.workerPool = Executors.newVirtualThreadPerTaskExecutor(); + this.ownedWorkerPool = true; + } + if (b.scheduler != null) { + this.scheduler = b.scheduler; + this.ownedScheduler = false; + } else { + this.scheduler = + Executors.newScheduledThreadPool( + 1, + r -> Thread.ofPlatform().name("arcp-runtime-scheduler", 0).daemon(true).unstarted(r)); + this.ownedScheduler = true; + } this.runtimeName = b.runtimeName; this.runtimeVersion = b.runtimeVersion; - this.idempotency = new IdempotencyStore(this.clock, b.idempotencyTtl); + this.idempotency = + new IdempotencyStore(this.clock, b.idempotencyTtl, this.scheduler, Duration.ofMinutes(1)); } private static Set effectiveFeatures(Builder b) { - EnumSet features = EnumSet.copyOf(b.advertised); + EnumSet features = safeFeatureCopy(b.advertised); boolean hasProvisioner = b.credentialProvisioner != null && b.credentialProvisioner != NoopCredentialProvisioner.INSTANCE; @@ -98,7 +106,14 @@ private static Set effectiveFeatures(Builder b) { features.remove(Feature.MODEL_USE); features.remove(Feature.PROVISIONED_CREDENTIALS); } - return features; + return java.util.Collections.unmodifiableSet(features); + } + + static EnumSet safeFeatureCopy(Set features) { + if (features == null || features.isEmpty()) { + return EnumSet.noneOf(Feature.class); + } + return EnumSet.copyOf(features); } public static Builder builder() { @@ -109,7 +124,10 @@ public static Builder builder() { public SessionLoop accept(Transport transport) { SessionLoop loop = new SessionLoop(this, transport); loop.start(); - sessions.put(loop.idOrPending(), loop); + // Always key by the stable pending id, not idOrPending() — the latter flips to the + // real session id after handshake and would leave the map keyed by a now-orphaned + // string, so removeSession would never find the entry (#23). + sessions.put(loop.pendingKey(), loop); return loop; } @@ -190,6 +208,10 @@ public Collection jobs() { } public void removeSession(SessionLoop loop) { + // Always remove via the pending key the loop was inserted under, since + // idOrPending() flips to the real session id after handshake and would + // otherwise leak the closed session in the map (#23). + sessions.remove(loop.pendingKey(), loop); sessions.remove(loop.idOrPending(), loop); } @@ -199,8 +221,13 @@ public void close() { loop.shutdown("runtime closing"); } sessions.clear(); - scheduler.shutdownNow(); - workerPool.shutdown(); + idempotency.close(); + if (ownedScheduler) { + scheduler.shutdownNow(); + } + if (ownedWorkerPool) { + workerPool.shutdown(); + } } public static final class Builder { @@ -244,7 +271,7 @@ public Builder verifier(BearerVerifier v) { } public Builder features(Set features) { - this.advertised = EnumSet.copyOf(features); + this.advertised = safeFeatureCopy(features); this.featuresConfigured = true; return this; } diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialBinding.java b/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialBinding.java index a486f36..1f4ce7c 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialBinding.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialBinding.java @@ -11,6 +11,7 @@ import java.time.Clock; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletionException; import java.util.function.BiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +88,27 @@ private void revoke(IssuedCredential credential) { provisioner.revoke(id).join(); store.markRevoked(id); return; + } catch (CompletionException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + if (attempt == MAX_REVOKE_ATTEMPTS) { + log.warn("credential revoke failed after {} attempts for {}", attempt, id, cause); + store.markRevocationFailed(id, cause); + return; + } + log.debug("credential revoke attempt {} failed for {}", attempt, id, cause); } catch (RuntimeException e) { if (attempt == MAX_REVOKE_ATTEMPTS) { - log.warn("credential revoke failed after {} attempts for {}", attempt, id); + log.warn("credential revoke failed after {} attempts for {}", attempt, id, e); + store.markRevocationFailed(id, e); return; } + log.debug("credential revoke attempt {} failed for {}", attempt, id, e); + } + try { + Thread.sleep(100L * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; } } } diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialRevocationStore.java b/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialRevocationStore.java index b040d04..c9cc4f2 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialRevocationStore.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/CredentialRevocationStore.java @@ -8,6 +8,14 @@ public interface CredentialRevocationStore { void markRevoked(CredentialId id); + /** + * Note that revocation failed for this credential after exhausting retries. Default + * implementation is a no-op so existing implementations continue to compile. + */ + default void markRevocationFailed(CredentialId id, Throwable cause) { + // No-op default; durable stores may persist the failure for asynchronous recovery. + } + List outstanding(); record Outstanding(CredentialId id, String providerHandle) {} diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/FileCredentialRevocationStore.java b/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/FileCredentialRevocationStore.java index a00bc52..69ac540 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/FileCredentialRevocationStore.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/credentials/FileCredentialRevocationStore.java @@ -6,20 +6,21 @@ import dev.arcp.core.credentials.CredentialId; import dev.arcp.core.wire.ArcpMapper; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -public final class FileCredentialRevocationStore implements CredentialRevocationStore { +public final class FileCredentialRevocationStore + implements CredentialRevocationStore, AutoCloseable { private final Path path; private final ObjectMapper mapper; private final Map outstanding = new LinkedHashMap<>(); + private final RandomAccessFile writer; public FileCredentialRevocationStore(Path path) { this(path, ArcpMapper.shared()); @@ -29,6 +30,21 @@ public FileCredentialRevocationStore(Path path, ObjectMapper mapper) { this.path = path; this.mapper = mapper; load(); + try { + this.writer = new RandomAccessFile(path.toFile(), "rwd"); + this.writer.seek(this.writer.length()); + } catch (IOException e) { + throw new IllegalStateException("could not open credential revocation store " + path, e); + } + } + + @Override + public synchronized void close() { + try { + writer.close(); + } catch (IOException e) { + throw new IllegalStateException("could not close credential revocation store " + path, e); + } } @Override @@ -87,11 +103,11 @@ private void append(String op, CredentialId id, String providerHandle) { if (!providerHandle.isEmpty()) { event.put("provider_handle", providerHandle); } - try (BufferedWriter writer = - Files.newBufferedWriter( - path, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { - writer.write(mapper.writeValueAsString(event)); - writer.newLine(); + try { + String line = mapper.writeValueAsString(event) + "\n"; + byte[] bytes = line.getBytes(StandardCharsets.UTF_8); + writer.write(bytes); + writer.getFD().sync(); } catch (IOException e) { throw new IllegalStateException("could not append credential revocation event", e); } diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/idempotency/IdempotencyStore.java b/arcp-runtime/src/main/java/dev/arcp/runtime/idempotency/IdempotencyStore.java index 7224745..c08846a 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/idempotency/IdempotencyStore.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/idempotency/IdempotencyStore.java @@ -7,28 +7,55 @@ import java.time.Instant; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.jspecify.annotations.Nullable; /** * §7.2 idempotency: a {@code (principal, idempotency_key)} pair maps to a stable {@link JobId} * within a sliding TTL window. Submitting an identical triple returns the existing job id; a * conflicting payload yields {@code DUPLICATE_KEY}. + * + *

The fingerprint passed to {@link #claim(Principal, String, String, JobId)} is a + * collision-resistant identifier over every semantically relevant {@code JobSubmit} field; see + * {@code SessionLoop} for the canonical computation. + * + *

Expired entries are evicted by a background prune task scheduled at construction time (if a + * scheduler is supplied) rather than synchronously on every {@code claim} call. */ -public final class IdempotencyStore { +public final class IdempotencyStore implements AutoCloseable { public record Conflict(JobId existing) {} private record Key(String principal, String idempotencyKey) {} - private record Entry(JobId jobId, int payloadHash, Instant insertedAt) {} + private record Entry(JobId jobId, String fingerprint, Instant insertedAt) {} private final ConcurrentHashMap entries = new ConcurrentHashMap<>(); private final Duration ttl; private final Clock clock; + private final @Nullable ScheduledFuture pruneTask; public IdempotencyStore(Clock clock, Duration ttl) { + this(clock, ttl, null, Duration.ofMinutes(1)); + } + + public IdempotencyStore( + Clock clock, + Duration ttl, + @Nullable ScheduledExecutorService scheduler, + Duration pruneInterval) { this.clock = Objects.requireNonNull(clock, "clock"); this.ttl = Objects.requireNonNull(ttl, "ttl"); + if (scheduler != null) { + long intervalMillis = Math.max(1L, pruneInterval.toMillis()); + this.pruneTask = + scheduler.scheduleWithFixedDelay( + this::pruneQuiet, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS); + } else { + this.pruneTask = null; + } } /** @@ -36,20 +63,19 @@ public IdempotencyStore(Clock clock, Duration ttl) { * *

    *
  • {@code null}: caller proceeds with {@code freshId}. - *
  • {@code Conflict(existing)}: the same key already produced a job (identical payload → - * reuse; different payload → caller raises {@code DUPLICATE_KEY}). + *
  • {@code Conflict(existing)}: the same key already produced a job (identical fingerprint → + * reuse; different fingerprint → caller raises {@code DUPLICATE_KEY}). *
*/ public @Nullable Conflict claim( - Principal principal, String idempotencyKey, int payloadHash, JobId freshId) { - prune(); + Principal principal, String idempotencyKey, String fingerprint, JobId freshId) { Key key = new Key(principal.id(), idempotencyKey); Entry existing = entries.compute( key, (k, prior) -> { if (prior == null) { - return new Entry(freshId, payloadHash, clock.instant()); + return new Entry(freshId, fingerprint, clock.instant()); } return prior; }); @@ -59,13 +85,37 @@ public IdempotencyStore(Clock clock, Duration ttl) { return new Conflict(existing.jobId); } - public boolean matchesPayload(Principal principal, String idempotencyKey, int payloadHash) { + public boolean matchesPayload(Principal principal, String idempotencyKey, String fingerprint) { Entry e = entries.get(new Key(principal.id(), idempotencyKey)); - return e != null && e.payloadHash == payloadHash; + return e != null && e.fingerprint.equals(fingerprint); } - private void prune() { + /** + * Evict entries older than {@code ttl}. Exposed for deterministic test control; the scheduled + * background task invokes this method automatically when a scheduler was supplied at construction + * time. + */ + public void prune() { Instant now = clock.instant(); entries.values().removeIf(e -> e.insertedAt.plus(ttl).isBefore(now)); } + + int size() { + return entries.size(); + } + + private void pruneQuiet() { + try { + prune(); + } catch (RuntimeException ignored) { + // best-effort background eviction + } + } + + @Override + public void close() { + if (pruneTask != null) { + pruneTask.cancel(false); + } + } } diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/lease/LeaseGuard.java b/arcp-runtime/src/main/java/dev/arcp/runtime/lease/LeaseGuard.java index 8650326..6d76dfd 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/lease/LeaseGuard.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/lease/LeaseGuard.java @@ -6,6 +6,8 @@ import dev.arcp.core.lease.LeaseConstraints; import java.time.Clock; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; import org.jspecify.annotations.Nullable; /** @@ -17,6 +19,7 @@ public final class LeaseGuard { private final Lease lease; private final LeaseConstraints constraints; private final Clock clock; + private final ConcurrentHashMap compiledGlobs = new ConcurrentHashMap<>(); public LeaseGuard(Lease lease, LeaseConstraints constraints, Clock clock) { this.lease = lease; @@ -39,7 +42,7 @@ public void authorize(String namespace, String pattern) "lease expired at " + constraints.expiresAt() + " for " + namespace); } List patterns = lease.patterns(namespace); - if (patterns.stream().noneMatch(allowed -> matches(allowed, pattern))) { + if (patterns.stream().noneMatch(allowed -> matchesCached(allowed, pattern))) { throw new PermissionDeniedException( namespace + " does not permit " + pattern + "; allowed=" + patterns); } @@ -50,11 +53,15 @@ public void authorizeModel(String modelId) authorize("model.use", modelId); } + private boolean matchesCached(String pattern, String value) { + return compiledGlobs.computeIfAbsent(pattern, LeaseGuard::globToRegex).matcher(value).matches(); + } + static boolean matches(String pattern, String value) { return globToRegex(pattern).matcher(value).matches(); } - private static java.util.regex.Pattern globToRegex(String glob) { + static java.util.regex.Pattern globToRegex(String glob) { StringBuilder sb = new StringBuilder("^"); int i = 0; while (i < glob.length()) { diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/session/JobRecord.java b/arcp-runtime/src/main/java/dev/arcp/runtime/session/JobRecord.java index d0b338c..0f457f1 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/session/JobRecord.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/session/JobRecord.java @@ -11,11 +11,13 @@ import dev.arcp.runtime.lease.BudgetCounters; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import org.jspecify.annotations.Nullable; /** Bookkeeping for one in-flight job on the runtime side. */ @@ -59,7 +61,8 @@ public String wire() { private final CopyOnWriteArrayList subscribers = new CopyOnWriteArrayList<>(); private volatile @Nullable Future worker; private volatile @Nullable ScheduledFuture expiryWatchdog; - private final CopyOnWriteArrayList credentials = new CopyOnWriteArrayList<>(); + private final Object credentialsLock = new Object(); + private final ArrayList credentials = new ArrayList<>(); public JobRecord( JobId jobId, @@ -161,37 +164,52 @@ public List eventsSince(long eventSeq) { .toList(); } - public CopyOnWriteArrayList subscribers() { - return subscribers; + public List subscribers() { + return Collections.unmodifiableList(subscribers); + } + + public void addSubscriber(Subscriber subscriber) { + subscribers.add(subscriber); + } + + public boolean removeSubscribersWhere(Predicate predicate) { + return subscribers.removeIf(predicate); } public List credentials() { - return List.copyOf(credentials); + synchronized (credentialsLock) { + return List.copyOf(credentials); + } } public void setCredentials(List issued) { - credentials.clear(); - credentials.addAll(issued); + synchronized (credentialsLock) { + credentials.clear(); + credentials.addAll(issued); + } } public @Nullable IssuedCredential replaceCredential(CredentialId id, IssuedCredential next) { - IssuedCredential prior = null; - for (int i = 0; i < credentials.size(); i++) { - IssuedCredential current = credentials.get(i); - if (current.wire().id().equals(id)) { - prior = current; - credentials.set(i, next); - return prior; + synchronized (credentialsLock) { + for (int i = 0; i < credentials.size(); i++) { + IssuedCredential current = credentials.get(i); + if (current.wire().id().equals(id)) { + IssuedCredential prior = current; + credentials.set(i, next); + return prior; + } } + credentials.add(next); + return null; } - credentials.add(next); - return null; } public List drainCredentials() { - List drained = new ArrayList<>(credentials); - credentials.clear(); - return drained; + synchronized (credentialsLock) { + List drained = new ArrayList<>(credentials); + credentials.clear(); + return drained; + } } public record Subscriber(SessionLoop session, JobId jobId) {} diff --git a/arcp-runtime/src/main/java/dev/arcp/runtime/session/SessionLoop.java b/arcp-runtime/src/main/java/dev/arcp/runtime/session/SessionLoop.java index 800a360..0cb58b8 100644 --- a/arcp-runtime/src/main/java/dev/arcp/runtime/session/SessionLoop.java +++ b/arcp-runtime/src/main/java/dev/arcp/runtime/session/SessionLoop.java @@ -91,7 +91,8 @@ public enum Phase { private final AgentRegistry agents; private final String pendingId = "pending:" + UUID.randomUUID(); - private volatile Phase phase = Phase.AWAITING_HELLO; + private final java.util.concurrent.atomic.AtomicReference phase = + new java.util.concurrent.atomic.AtomicReference<>(Phase.AWAITING_HELLO); private volatile @Nullable SessionId sessionId; private volatile @Nullable Principal principal; private volatile Set negotiated = EnumSet.noneOf(Feature.class); @@ -129,6 +130,11 @@ public String idOrPending() { return s == null ? pendingId : s.value(); } + /** Stable key used at session insertion time; never flips even after handshake. */ + public String pendingKey() { + return pendingId; + } + public void start() { transport.incoming().subscribe(this); } @@ -160,10 +166,9 @@ public void onComplete() { } public void shutdown(String reason) { - if (phase == Phase.CLOSED) { + if (phase.getAndSet(Phase.CLOSED) == Phase.CLOSED) { return; } - phase = Phase.CLOSED; ScheduledFuture hb = heartbeatTick; if (hb != null) { hb.cancel(false); @@ -195,7 +200,7 @@ public void shutdown(String reason) { } private void handle(Envelope envelope) { - Phase p = phase; + Phase p = phase.get(); Message m; try { m = Messages.decode(mapper, envelope); @@ -246,7 +251,7 @@ private void doHandshake(SessionHello hello) { this.resumeToken = UUID.randomUUID().toString(); this.negotiated = Capabilities.intersect(hello.capabilities().features(), runtime.advertised()); - this.phase = Phase.ACTIVE; + this.phase.set(Phase.ACTIVE); Capabilities welcomeCaps = new Capabilities(List.of("json"), negotiated, agents.describe()); SessionWelcome welcome = @@ -295,7 +300,7 @@ private Principal authenticate(SessionHello hello) throws ArcpException { } private void tickHeartbeat(Duration interval) { - if (phase != Phase.ACTIVE) { + if (phase.get() != Phase.ACTIVE) { return; } if (heartbeat.shouldClose(interval)) { @@ -340,6 +345,10 @@ private void handleListJobs(MessageId requestId, SessionListJobs req) { .filter( rec -> filter.createdAfter() == null || rec.createdAt().isAfter(filter.createdAfter())) + // Deterministic order: createdAt then jobId for ties. + .sorted( + java.util.Comparator.comparing((JobRecord rec) -> rec.createdAt()) + .thenComparing(rec -> rec.jobId().value())) .map( rec -> new JobSummary( @@ -352,7 +361,34 @@ private void handleListJobs(MessageId requestId, SessionListJobs req) { rec.traceId(), rec.lastEventSeq())) .toList(); - SessionJobs response = new SessionJobs(requestId, matching, null); + + int startIndex = 0; + if (req.cursor() != null && !req.cursor().isBlank()) { + try { + byte[] decoded = java.util.Base64.getUrlDecoder().decode(req.cursor()); + startIndex = Integer.parseInt(new String(decoded, java.nio.charset.StandardCharsets.UTF_8)); + if (startIndex < 0) { + startIndex = 0; + } + } catch (IllegalArgumentException e) { + sendJobErrorTopLevel(null, ErrorCode.INVALID_REQUEST, "invalid list_jobs cursor"); + return; + } + } + int limit = req.limit() != null && req.limit() > 0 ? req.limit() : matching.size(); + int endIndex = Math.min(matching.size(), startIndex + limit); + List page = + startIndex >= matching.size() + ? List.of() + : List.copyOf(matching.subList(startIndex, endIndex)); + String nextCursor = + endIndex < matching.size() + ? java.util.Base64.getUrlEncoder() + .withoutPadding() + .encodeToString( + Integer.toString(endIndex).getBytes(java.nio.charset.StandardCharsets.UTF_8)) + : null; + SessionJobs response = new SessionJobs(requestId, page, nextCursor); send(Message.Type.SESSION_JOBS, response, sessionId, null, null, null); } @@ -373,14 +409,14 @@ private void handleSubmit(Envelope envelope, JobSubmit submit) { } } - // §7.2: idempotency. Identical (principal, key, payload) returns the prior job_id. + // §7.2: idempotency. Identical (principal, key, full-submit fingerprint) returns prior job_id. String idempotencyKey = submit.idempotencyKey(); if (idempotencyKey != null) { - int payloadHash = submit.input().hashCode() ^ submit.agent().wire().hashCode(); + String fingerprint = idempotencyFingerprint(submit); JobId fresh = JobId.generate(); - var conflict = runtime.idempotency().claim(pr, idempotencyKey, payloadHash, fresh); + var conflict = runtime.idempotency().claim(pr, idempotencyKey, fingerprint, fresh); if (conflict != null) { - if (runtime.idempotency().matchesPayload(pr, idempotencyKey, payloadHash)) { + if (runtime.idempotency().matchesPayload(pr, idempotencyKey, fingerprint)) { JobRecord prior = runtime.job(conflict.existing()); if (prior != null) { emitReplayAccepted(prior, envelope.traceId()); @@ -399,6 +435,41 @@ private void handleSubmit(Envelope envelope, JobSubmit submit) { acceptJob(envelope, submit, pr, now, JobId.generate()); } + /** + * Build a collision-resistant fingerprint over the semantically meaningful {@link JobSubmit} + * fields: agent, input, lease_request, lease_constraints, max_runtime_sec. The serialization is + * canonical (alphabetically sorted keys) so that semantically equal payloads always produce the + * same fingerprint and the comparison in {@link + * dev.arcp.runtime.idempotency.IdempotencyStore#matchesPayload} is robust to key-order changes. + */ + private String idempotencyFingerprint(JobSubmit submit) { + try { + ObjectMapper canonical = mapper.copy(); + canonical.configure( + com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + canonical.setNodeFactory( + com.fasterxml.jackson.databind.node.JsonNodeFactory.withExactBigDecimals(true)); + ObjectNode canon = canonical.createObjectNode(); + canon.put("agent", submit.agent().wire()); + canon.set("input", canonical.valueToTree(submit.input())); + if (submit.leaseRequest() != null) { + canon.set("lease_request", canonical.valueToTree(submit.leaseRequest())); + } + if (submit.leaseConstraints() != null) { + canon.set("lease_constraints", canonical.valueToTree(submit.leaseConstraints())); + } + if (submit.maxRuntimeSec() != null) { + canon.put("max_runtime_sec", submit.maxRuntimeSec()); + } + byte[] bytes = canonical.writerWithDefaultPrettyPrinter().writeValueAsBytes(canon); + java.security.MessageDigest md = java.security.MessageDigest.getInstance("SHA-256"); + return java.util.HexFormat.of().formatHex(md.digest(bytes)); + } catch (com.fasterxml.jackson.core.JsonProcessingException + | java.security.NoSuchAlgorithmException e) { + throw new IllegalStateException("idempotency fingerprint failure", e); + } + } + private void emitReplayAccepted(JobRecord prior, @Nullable TraceId traceId) { Map budgetSnapshot = prior.budget().snapshot(); JobAccepted accepted = @@ -535,7 +606,7 @@ public void emit(EventBody body) { public boolean cancelled() { return Thread.currentThread().isInterrupted() || record.status() == JobRecord.Status.CANCELLED - || phase == Phase.CLOSED; + || phase.get() == Phase.CLOSED; } @Override @@ -680,7 +751,7 @@ public void emit(EventBody body) { @Override public boolean cancelled() { - return record.status().terminal() || phase == Phase.CLOSED; + return record.status().terminal() || phase.get() == Phase.CLOSED; } @Override @@ -726,7 +797,12 @@ private void handleSubscribe(JobSubscribe sub) { null, ErrorCode.JOB_NOT_FOUND, "job not found or not visible: " + sub.jobId()); return; } - rec.subscribers().add(new JobRecord.Subscriber(this, rec.jobId())); + boolean alreadySubscribed = + rec.subscribers().stream() + .anyMatch(s -> s.session() == this && s.jobId().equals(rec.jobId())); + if (!alreadySubscribed) { + rec.addSubscriber(new JobRecord.Subscriber(this, rec.jobId())); + } boolean wantHistory = Boolean.TRUE.equals(sub.history()); long subscribedFrom = sub.fromEventSeq() != null ? sub.fromEventSeq() : 0; long now = eventSeq.get(); @@ -757,7 +833,7 @@ private void handleSubscribe(JobSubscribe sub) { private void handleUnsubscribe(JobUnsubscribe unsub) { JobRecord rec = runtime.job(unsub.jobId()); if (rec != null) { - rec.subscribers().removeIf(s -> s.session() == this); + rec.removeSubscribersWhere(s -> s.session() == this); } } @@ -809,7 +885,7 @@ private void sendJobMessage(JobRecord rec, Message.Type type, Message msg, long @Nullable TraceId tid, @Nullable JobId jid, @Nullable Long seq) { - if (phase == Phase.CLOSED) { + if (phase.get() == Phase.CLOSED) { return null; } ObjectNode payloadJson = Messages.encodePayload(mapper, payload); @@ -841,6 +917,6 @@ public Set negotiated() { } public Phase phase() { - return phase; + return phase.get(); } } diff --git a/arcp-runtime/src/test/java/dev/arcp/runtime/lease/LeaseGuardCacheTest.java b/arcp-runtime/src/test/java/dev/arcp/runtime/lease/LeaseGuardCacheTest.java new file mode 100644 index 0000000..804f97b --- /dev/null +++ b/arcp-runtime/src/test/java/dev/arcp/runtime/lease/LeaseGuardCacheTest.java @@ -0,0 +1,27 @@ +package dev.arcp.runtime.lease; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.arcp.core.error.PermissionDeniedException; +import dev.arcp.core.lease.Lease; +import dev.arcp.core.lease.LeaseConstraints; +import java.time.Clock; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class LeaseGuardCacheTest { + + @Test + void repeatedAuthorize_reusesCachedPattern() { + Lease lease = new Lease(Map.of("file.read", List.of("/etc/**", "/var/log/*.log"))); + LeaseGuard guard = new LeaseGuard(lease, LeaseConstraints.none(), Clock.systemUTC()); + for (int i = 0; i < 1000; i++) { + assertDoesNotThrow(() -> guard.authorize("file.read", "/etc/passwd")); + } + assertThrows( + PermissionDeniedException.class, + () -> guard.authorize("file.read", "/home/user/secret.txt")); + } +} diff --git a/arcp-tck/src/test/java/dev/arcp/tck/MemoryTransportTckTest.java b/arcp-tck/src/test/java/dev/arcp/tck/MemoryTransportTckTest.java index 917a4f0..ce60fc8 100644 --- a/arcp-tck/src/test/java/dev/arcp/tck/MemoryTransportTckTest.java +++ b/arcp-tck/src/test/java/dev/arcp/tck/MemoryTransportTckTest.java @@ -42,15 +42,15 @@ private static final class InProcessProvider implements TckProvider { @Override public ArcpClient connect() { - MemoryTransport[] pair = MemoryTransport.pair(); - runtime.accept(pair[0]); - return ArcpClient.builder(pair[1]).build(); + MemoryTransport.Pair pair = MemoryTransport.pair(); + runtime.accept(pair.runtime()); + return ArcpClient.builder(pair.client()).build(); } @Override public ArcpClient connectWithProvisionedCredentials( CredentialProvisioner provisioner, CredentialRevocationStore store) { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); credentialRuntime = ArcpRuntime.builder() .credentialProvisioner(provisioner) @@ -58,8 +58,8 @@ public ArcpClient connectWithProvisionedCredentials( .agent( "tck-echo", "1.0.0", (input, ctx) -> JobOutcome.Success.inline(input.payload())) .build(); - credentialRuntime.accept(pair[0]); - return ArcpClient.builder(pair[1]).build(); + credentialRuntime.accept(pair.runtime()); + return ArcpClient.builder(pair.client()).build(); } @Override diff --git a/build.gradle.kts b/build.gradle.kts index 6d57a64..adee93c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,6 +13,7 @@ allprojects { subprojects { plugins.withId("java-library") { + apply(plugin = "jacoco") extensions.configure { toolchain { languageVersion.set(JavaLanguageVersion.of(21)) @@ -34,6 +35,14 @@ subprojects { } tasks.withType().configureEach { useJUnitPlatform() + finalizedBy("jacocoTestReport") + } + tasks.withType().configureEach { + dependsOn("test") + reports { + xml.required.set(true) + html.required.set(true) + } } tasks.withType().configureEach { (options as? StandardJavadocDocletOptions)?.apply { diff --git a/examples/ack-backpressure/src/main/java/dev/arcp/examples/ackbackpressure/Main.java b/examples/ack-backpressure/src/main/java/dev/arcp/examples/ackbackpressure/Main.java index f8e6d40..9d074ab 100644 --- a/examples/ack-backpressure/src/main/java/dev/arcp/examples/ackbackpressure/Main.java +++ b/examples/ack-backpressure/src/main/java/dev/arcp/examples/ackbackpressure/Main.java @@ -21,7 +21,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("ticker", "1.0.0", (input, ctx) -> { for (int i = 1; i <= 20; i++) { @@ -30,11 +30,11 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); AtomicInteger received = new AtomicInteger(); - try (ArcpClient client = ArcpClient.builder(pair[1]) + try (ArcpClient client = ArcpClient.builder(pair.client()) .autoAck(false) .features(EnumSet.allOf(Feature.class)) .build()) { diff --git a/examples/agent-versions/src/main/java/dev/arcp/examples/agentversions/Main.java b/examples/agent-versions/src/main/java/dev/arcp/examples/agentversions/Main.java index 5e5ae0d..ab02d04 100644 --- a/examples/agent-versions/src/main/java/dev/arcp/examples/agentversions/Main.java +++ b/examples/agent-versions/src/main/java/dev/arcp/examples/agentversions/Main.java @@ -14,7 +14,7 @@ /** Two registered versions; bare-name resolves to default; unknown version errors. */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("code-refactor", "1.0.0", (input, ctx) -> JobOutcome.Success.inline( @@ -24,9 +24,9 @@ public static void main(String[] args) throws Exception { JsonNodeFactory.instance.objectNode().put("v", "2.0.0"))) .build(); runtime.agents().setDefault("code-refactor", "2.0.0"); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle pinned = client.submit(ArcpClient.jobSubmit( diff --git a/examples/cancel/src/main/java/dev/arcp/examples/cancel/Main.java b/examples/cancel/src/main/java/dev/arcp/examples/cancel/Main.java index eaa1550..6a4848b 100644 --- a/examples/cancel/src/main/java/dev/arcp/examples/cancel/Main.java +++ b/examples/cancel/src/main/java/dev/arcp/examples/cancel/Main.java @@ -16,7 +16,7 @@ /** Submits a long-running job, then cancels it; the resulting future fails with CancelledException. */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("sleeper", "1.0.0", (input, ctx) -> { ctx.emit(new LogEvent("info", "sleeping")); @@ -27,9 +27,9 @@ public static void main(String[] args) throws Exception { dev.arcp.core.error.ErrorCode.CANCELLED, "cooperative"); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = client.submit( ArcpClient.jobSubmit("sleeper@1.0.0", JsonNodeFactory.instance.objectNode())); diff --git a/examples/cost-budget/src/main/java/dev/arcp/examples/costbudget/Main.java b/examples/cost-budget/src/main/java/dev/arcp/examples/costbudget/Main.java index ed5b04f..42f8016 100644 --- a/examples/cost-budget/src/main/java/dev/arcp/examples/costbudget/Main.java +++ b/examples/cost-budget/src/main/java/dev/arcp/examples/costbudget/Main.java @@ -17,7 +17,7 @@ /** Tracks a USD budget; agent over-spends and surfaces BUDGET_EXHAUSTED. */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("spender", "1.0.0", (input, ctx) -> { // Spend over budget by accumulating cost.inference metrics. @@ -30,9 +30,9 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); Lease lease = Lease.builder() .allow("tool.call", "*") diff --git a/examples/custom-auth/src/main/java/dev/arcp/examples/customauth/Main.java b/examples/custom-auth/src/main/java/dev/arcp/examples/customauth/Main.java index 0fa2bcd..4b9e033 100644 --- a/examples/custom-auth/src/main/java/dev/arcp/examples/customauth/Main.java +++ b/examples/custom-auth/src/main/java/dev/arcp/examples/customauth/Main.java @@ -42,7 +42,7 @@ public static void main(String[] args) throws Exception { return new Principal(body); }; - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .verifier(verifier) .agent("whoami", "1.0.0", @@ -50,9 +50,9 @@ public static void main(String[] args) throws Exception { JsonNodeFactory.instance.objectNode() .put("session", input.sessionId().value()))) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).bearer(validToken).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).bearer(validToken).build()) { Session session = client.connect(Duration.ofSeconds(5)); assert session.sessionId() != null; System.out.println("OK custom-auth"); diff --git a/examples/delegate/src/main/java/dev/arcp/examples/delegate/Main.java b/examples/delegate/src/main/java/dev/arcp/examples/delegate/Main.java index 8a8fee8..91ea0ba 100644 --- a/examples/delegate/src/main/java/dev/arcp/examples/delegate/Main.java +++ b/examples/delegate/src/main/java/dev/arcp/examples/delegate/Main.java @@ -20,7 +20,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -35,9 +35,9 @@ public static void main(String[] args) throws Exception { "1.0.0", (input, ctx) -> JobOutcome.Success.inline(input.payload())) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); CompletableFuture childFuture = new CompletableFuture<>(); diff --git a/examples/heartbeat/src/main/java/dev/arcp/examples/heartbeat/Main.java b/examples/heartbeat/src/main/java/dev/arcp/examples/heartbeat/Main.java index ba10f1c..7da4b59 100644 --- a/examples/heartbeat/src/main/java/dev/arcp/examples/heartbeat/Main.java +++ b/examples/heartbeat/src/main/java/dev/arcp/examples/heartbeat/Main.java @@ -13,16 +13,16 @@ /** Negotiates heartbeat at 1s interval; idle for >2s then verifies session is still active. */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .heartbeatIntervalSec(1) .agent("noop", "1.0.0", (input, ctx) -> JobOutcome.Success.inline( JsonNodeFactory.instance.objectNode())) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]) + try (ArcpClient client = ArcpClient.builder(pair.client()) .features(EnumSet.of(Feature.HEARTBEAT)) .build()) { Session session = client.connect(Duration.ofSeconds(5)); diff --git a/examples/idempotent-retry/src/main/java/dev/arcp/examples/idempotentretry/Main.java b/examples/idempotent-retry/src/main/java/dev/arcp/examples/idempotentretry/Main.java index a88a893..ff5c6aa 100644 --- a/examples/idempotent-retry/src/main/java/dev/arcp/examples/idempotentretry/Main.java +++ b/examples/idempotent-retry/src/main/java/dev/arcp/examples/idempotentretry/Main.java @@ -13,14 +13,14 @@ /** Submits the same job twice with the same idempotency_key; second returns same job_id. */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("report", "1.0.0", (input, ctx) -> JobOutcome.Success.inline(input.payload())) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); ObjectNode payload = JsonNodeFactory.instance.objectNode(); payload.put("week", 19); diff --git a/examples/lease-expires-at/src/main/java/dev/arcp/examples/leaseexpiresat/Main.java b/examples/lease-expires-at/src/main/java/dev/arcp/examples/leaseexpiresat/Main.java index 11f2ca3..e7371d2 100644 --- a/examples/lease-expires-at/src/main/java/dev/arcp/examples/leaseexpiresat/Main.java +++ b/examples/lease-expires-at/src/main/java/dev/arcp/examples/leaseexpiresat/Main.java @@ -19,16 +19,16 @@ public final class Main { public static void main(String[] args) throws Exception { CountDownLatch release = new CountDownLatch(1); - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("idle", "1.0.0", (input, ctx) -> { release.await(5, TimeUnit.SECONDS); return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); Lease lease = Lease.builder().allow("tool.call", "*").build(); LeaseConstraints constraints = LeaseConstraints.of( diff --git a/examples/lease-violation/src/main/java/dev/arcp/examples/leaseviolation/Main.java b/examples/lease-violation/src/main/java/dev/arcp/examples/leaseviolation/Main.java index 2836803..de7c6e5 100644 --- a/examples/lease-violation/src/main/java/dev/arcp/examples/leaseviolation/Main.java +++ b/examples/lease-violation/src/main/java/dev/arcp/examples/leaseviolation/Main.java @@ -18,7 +18,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -30,9 +30,9 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); Lease lease = Lease.builder().allow("fs.read", "*").build(); diff --git a/examples/list-jobs/src/main/java/dev/arcp/examples/listjobs/Main.java b/examples/list-jobs/src/main/java/dev/arcp/examples/listjobs/Main.java index 614ecec..26f2ff8 100644 --- a/examples/list-jobs/src/main/java/dev/arcp/examples/listjobs/Main.java +++ b/examples/list-jobs/src/main/java/dev/arcp/examples/listjobs/Main.java @@ -16,7 +16,7 @@ public final class Main { public static void main(String[] args) throws Exception { CountDownLatch hold = new CountDownLatch(1); - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("blocker", "1.0.0", (input, ctx) -> { hold.await(); @@ -25,9 +25,9 @@ public static void main(String[] args) throws Exception { .agent("fast", "1.0.0", (input, ctx) -> JobOutcome.Success.inline(input.payload())) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); var fast = client.submit(ArcpClient.jobSubmit( "fast@1.0.0", JsonNodeFactory.instance.objectNode())); diff --git a/examples/progress/src/main/java/dev/arcp/examples/progress/Main.java b/examples/progress/src/main/java/dev/arcp/examples/progress/Main.java index a6d354a..e1f3a98 100644 --- a/examples/progress/src/main/java/dev/arcp/examples/progress/Main.java +++ b/examples/progress/src/main/java/dev/arcp/examples/progress/Main.java @@ -20,7 +20,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -35,9 +35,9 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); AtomicInteger count = new AtomicInteger(); diff --git a/examples/provisioned-credentials/src/main/java/dev/arcp/examples/provisionedcredentials/Main.java b/examples/provisioned-credentials/src/main/java/dev/arcp/examples/provisionedcredentials/Main.java index a2f1ba2..f3bfae4 100644 --- a/examples/provisioned-credentials/src/main/java/dev/arcp/examples/provisionedcredentials/Main.java +++ b/examples/provisioned-credentials/src/main/java/dev/arcp/examples/provisionedcredentials/Main.java @@ -24,7 +24,7 @@ public final class Main { private Main() {} public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); AtomicBoolean revoked = new AtomicBoolean(); CredentialProvisioner provisioner = new CredentialProvisioner() { @Override @@ -61,9 +61,9 @@ public CompletableFuture revoke(CredentialId id) { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (runtime; ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (runtime; ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); Lease lease = Lease.builder() .allow("model.use", "tier-fast/*") diff --git a/examples/result-chunk/src/main/java/dev/arcp/examples/resultchunk/Main.java b/examples/result-chunk/src/main/java/dev/arcp/examples/resultchunk/Main.java index 54693ef..d157c76 100644 --- a/examples/result-chunk/src/main/java/dev/arcp/examples/resultchunk/Main.java +++ b/examples/result-chunk/src/main/java/dev/arcp/examples/resultchunk/Main.java @@ -20,7 +20,7 @@ public final class Main { public static void main(String[] args) throws Exception { ResultId resultId = ResultId.of("res_chunked"); - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("chunked", "1.0.0", (input, ctx) -> { ctx.emit(new ResultChunkEvent(resultId, 0, "hello ", "utf8", true)); @@ -29,9 +29,9 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.streamed(resultId, 18, "3 chunks"); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = client.submit(ArcpClient.jobSubmit( "chunked@1.0.0", JsonNodeFactory.instance.objectNode())); diff --git a/examples/resume/src/main/java/dev/arcp/examples/resume/Main.java b/examples/resume/src/main/java/dev/arcp/examples/resume/Main.java index a2606c4..f185498 100644 --- a/examples/resume/src/main/java/dev/arcp/examples/resume/Main.java +++ b/examples/resume/src/main/java/dev/arcp/examples/resume/Main.java @@ -26,13 +26,13 @@ public static void main(String[] args) throws Exception { .build(); // First connection: submit a job and capture resume state. - MemoryTransport[] pair1 = MemoryTransport.pair(); - runtime.accept(pair1[0]); + MemoryTransport.Pair pair1 = MemoryTransport.pair(); + runtime.accept(pair1.runtime()); String resumeToken = null; long lastSeq = 0L; - try (ArcpClient client1 = ArcpClient.builder(pair1[1]).build()) { + try (ArcpClient client1 = ArcpClient.builder(pair1.client()).build()) { client1.connect(Duration.ofSeconds(5)); JobHandle handle = client1.submit( @@ -47,10 +47,10 @@ public static void main(String[] args) throws Exception { } // Second connection: optionally resume from captured token. - MemoryTransport[] pair2 = MemoryTransport.pair(); - runtime.accept(pair2[0]); + MemoryTransport.Pair pair2 = MemoryTransport.pair(); + runtime.accept(pair2.runtime()); - ArcpClient.Builder builder2 = ArcpClient.builder(pair2[1]); + ArcpClient.Builder builder2 = ArcpClient.builder(pair2.client()); if (resumeToken != null) { builder2 = builder2.resumeToken(resumeToken).lastEventSeq(lastSeq); } diff --git a/examples/submit-and-stream/src/main/java/dev/arcp/examples/submitandstream/Main.java b/examples/submit-and-stream/src/main/java/dev/arcp/examples/submitandstream/Main.java index 409c1da..f2d80a5 100644 --- a/examples/submit-and-stream/src/main/java/dev/arcp/examples/submitandstream/Main.java +++ b/examples/submit-and-stream/src/main/java/dev/arcp/examples/submitandstream/Main.java @@ -19,7 +19,7 @@ /** Submits a job, streams its events, and prints the assembled final result. */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent("emitter", "1.0.0", (input, ctx) -> { ctx.emit(new StatusEvent("starting", null)); @@ -30,9 +30,9 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { Session session = client.connect(Duration.ofSeconds(5)); assert session.sessionId() != null; diff --git a/examples/subscribe/src/main/java/dev/arcp/examples/subscribe/Main.java b/examples/subscribe/src/main/java/dev/arcp/examples/subscribe/Main.java index eeccb2a..1e5ed63 100644 --- a/examples/subscribe/src/main/java/dev/arcp/examples/subscribe/Main.java +++ b/examples/subscribe/src/main/java/dev/arcp/examples/subscribe/Main.java @@ -22,8 +22,8 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair1 = MemoryTransport.pair(); - MemoryTransport[] pair2 = MemoryTransport.pair(); + MemoryTransport.Pair pair1 = MemoryTransport.pair(); + MemoryTransport.Pair pair2 = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() @@ -37,13 +37,13 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair1[0]); - runtime.accept(pair2[0]); + runtime.accept(pair1.runtime()); + runtime.accept(pair2.runtime()); JobId jobId; // Client A: submit the job and wait for it to complete. - try (ArcpClient clientA = ArcpClient.builder(pair1[1]).bearer("demo").build()) { + try (ArcpClient clientA = ArcpClient.builder(pair1.client()).bearer("demo").build()) { clientA.connect(Duration.ofSeconds(5)); JobHandle handle = clientA.submit( @@ -57,7 +57,7 @@ public static void main(String[] args) throws Exception { AtomicInteger replayCount = new AtomicInteger(); CompletableFuture allReplayed = new CompletableFuture<>(); - try (ArcpClient clientB = ArcpClient.builder(pair2[1]).bearer("demo").build()) { + try (ArcpClient clientB = ArcpClient.builder(pair2.client()).bearer("demo").build()) { clientB.connect(Duration.ofSeconds(5)); Flow.Publisher events = diff --git a/examples/tracing/src/main/java/dev/arcp/examples/tracing/Main.java b/examples/tracing/src/main/java/dev/arcp/examples/tracing/Main.java index df885e5..96bbf7f 100644 --- a/examples/tracing/src/main/java/dev/arcp/examples/tracing/Main.java +++ b/examples/tracing/src/main/java/dev/arcp/examples/tracing/Main.java @@ -36,7 +36,7 @@ public static void main(String[] args) throws Exception { .build() .getTracer("arcp-example-tracing"); - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -44,10 +44,10 @@ public static void main(String[] args) throws Exception { "1.0.0", (input, ctx) -> JobOutcome.Success.inline(input.payload())) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); // Wrap the client-side transport with OTel tracing. - Transport tracingTransport = ArcpOtel.withTracing(pair[1], tracer); + Transport tracingTransport = ArcpOtel.withTracing(pair.client(), tracer); try (ArcpClient client = ArcpClient.builder(tracingTransport).build()) { client.connect(Duration.ofSeconds(5)); diff --git a/examples/vendor-extensions/src/main/java/dev/arcp/examples/vendorextensions/Main.java b/examples/vendor-extensions/src/main/java/dev/arcp/examples/vendorextensions/Main.java index 4686669..2e739fd 100644 --- a/examples/vendor-extensions/src/main/java/dev/arcp/examples/vendorextensions/Main.java +++ b/examples/vendor-extensions/src/main/java/dev/arcp/examples/vendorextensions/Main.java @@ -19,7 +19,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() @@ -31,11 +31,11 @@ public static void main(String[] args) throws Exception { return JobOutcome.Success.inline(input.payload()); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); CompletableFuture thoughtReceived = new CompletableFuture<>(); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); JobHandle handle = diff --git a/recipes/email-vendor-leases/src/main/java/dev/arcp/recipes/emailvendorleases/Main.java b/recipes/email-vendor-leases/src/main/java/dev/arcp/recipes/emailvendorleases/Main.java index 06c3c2f..2934d5c 100644 --- a/recipes/email-vendor-leases/src/main/java/dev/arcp/recipes/emailvendorleases/Main.java +++ b/recipes/email-vendor-leases/src/main/java/dev/arcp/recipes/emailvendorleases/Main.java @@ -20,7 +20,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -37,9 +37,9 @@ public static void main(String[] args) throws Exception { .put("vendor", "sendgrid")); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); // Lease grants exactly the two capabilities the email agent needs. diff --git a/recipes/mcp-skill/src/main/java/dev/arcp/recipes/mcpskill/Main.java b/recipes/mcp-skill/src/main/java/dev/arcp/recipes/mcpskill/Main.java index 87e450a..3d55237 100644 --- a/recipes/mcp-skill/src/main/java/dev/arcp/recipes/mcpskill/Main.java +++ b/recipes/mcp-skill/src/main/java/dev/arcp/recipes/mcpskill/Main.java @@ -24,7 +24,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -42,11 +42,11 @@ public static void main(String[] args) throws Exception { .put("tempC", 22)); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); CompletableFuture thoughtReceived = new CompletableFuture<>(); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); // Lease allows only the mcp:weather tool call. diff --git a/recipes/multi-agent-budget/src/main/java/dev/arcp/recipes/multiagentbudget/Main.java b/recipes/multi-agent-budget/src/main/java/dev/arcp/recipes/multiagentbudget/Main.java index fc52bc4..d099637 100644 --- a/recipes/multi-agent-budget/src/main/java/dev/arcp/recipes/multiagentbudget/Main.java +++ b/recipes/multi-agent-budget/src/main/java/dev/arcp/recipes/multiagentbudget/Main.java @@ -24,7 +24,7 @@ */ public final class Main { public static void main(String[] args) throws Exception { - MemoryTransport[] pair = MemoryTransport.pair(); + MemoryTransport.Pair pair = MemoryTransport.pair(); ArcpRuntime runtime = ArcpRuntime.builder() .agent( @@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception { .put("agent", "expensive")); }) .build(); - runtime.accept(pair[0]); + runtime.accept(pair.runtime()); // Lease: allow all tool calls but cap total spend at USD 5.00. Lease lease = @@ -69,7 +69,7 @@ public static void main(String[] args) throws Exception { .allow("cost.budget", "USD:5.00") .build(); - try (ArcpClient client = ArcpClient.builder(pair[1]).build()) { + try (ArcpClient client = ArcpClient.builder(pair.client()).build()) { client.connect(Duration.ofSeconds(5)); // Cheap agent (USD 1.00) should succeed within budget. diff --git a/recipes/stream-resume/src/main/java/dev/arcp/recipes/streamresume/Main.java b/recipes/stream-resume/src/main/java/dev/arcp/recipes/streamresume/Main.java index c42b5bd..c32eab8 100644 --- a/recipes/stream-resume/src/main/java/dev/arcp/recipes/streamresume/Main.java +++ b/recipes/stream-resume/src/main/java/dev/arcp/recipes/streamresume/Main.java @@ -40,8 +40,8 @@ public static void main(String[] args) throws Exception { .build(); // --- Client 1: live consumption. --- - MemoryTransport[] pair1 = MemoryTransport.pair(); - runtime.accept(pair1[0]); + MemoryTransport.Pair pair1 = MemoryTransport.pair(); + runtime.accept(pair1.runtime()); String resumeToken = null; long lastSeq = 0L; @@ -49,7 +49,7 @@ public static void main(String[] args) throws Exception { AtomicInteger firstPassCount = new AtomicInteger(); CompletableFuture firstPassDone = new CompletableFuture<>(); - try (ArcpClient client1 = ArcpClient.builder(pair1[1]).bearer("demo").build()) { + try (ArcpClient client1 = ArcpClient.builder(pair1.client()).bearer("demo").build()) { client1.connect(Duration.ofSeconds(5)); JobHandle handle = @@ -95,13 +95,13 @@ public void onComplete() {} int firstCount = firstPassCount.get(); // --- Client 2: replay via history subscription. --- - MemoryTransport[] pair2 = MemoryTransport.pair(); - runtime.accept(pair2[0]); + MemoryTransport.Pair pair2 = MemoryTransport.pair(); + runtime.accept(pair2.runtime()); AtomicInteger replayCount = new AtomicInteger(); CompletableFuture replayDone = new CompletableFuture<>(); - ArcpClient.Builder builder2 = ArcpClient.builder(pair2[1]).bearer("demo"); + ArcpClient.Builder builder2 = ArcpClient.builder(pair2.client()).bearer("demo"); if (resumeToken != null) { builder2 = builder2.resumeToken(resumeToken).lastEventSeq(lastSeq); }