Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CONFORMANCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ works" is decomposed into multiple binary rows.

- HTTP/2 + QUIC transports
- mTLS / OAuth2 auth schemes
- stdio newline-delimited JSON transport (`MemoryTransport` covers in-process use)
- §15.6 trust elevation
- Quarkus and Helidon middleware (Phase 5 deferred them; `arcp-runtime-jetty`,
`arcp-middleware-jakarta`, `arcp-middleware-spring-boot`, and
Expand Down
216 changes: 169 additions & 47 deletions arcp-client/src/main/java/dev/arcp/client/ArcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -69,6 +71,13 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope

private static final Logger log = LoggerFactory.getLogger(ArcpClient.class);

static EnumSet<Feature> safeFeatureCopy(Set<Feature> features) {
if (features == null || features.isEmpty()) {
return EnumSet.noneOf(Feature.class);
}
return EnumSet.copyOf(features);
}

private final Transport transport;
private final ObjectMapper mapper;
private final ClientInfo info;
Expand All @@ -88,6 +97,12 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
private @Nullable ScheduledFuture<?> heartbeatWatchdog;
private final ConcurrentHashMap<JobId, SubmissionPublisher<EventBody>> liveSubscribers =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<JobId, ExecutorService> liveExecutors = new ConcurrentHashMap<>();
private final boolean ownedScheduler;

// Reserved for future use by tests asserting FIFO insertion order of pending submits.
@SuppressWarnings("unused")
private final ConcurrentLinkedDeque<MessageId> pendingSubmitOrder = new ConcurrentLinkedDeque<>();

@SuppressWarnings("unused")
private Flow.@Nullable Subscription subscription;
Expand All @@ -103,16 +118,19 @@ private ArcpClient(Builder b) {
this.mapper = b.mapper != null ? b.mapper : ArcpMapper.shared();
this.info = b.info;
this.auth = b.auth;
this.requestedFeatures = EnumSet.copyOf(b.features);
this.requestedFeatures = safeFeatureCopy(b.features);
this.autoAck = b.autoAck;
this.ackInterval = b.ackInterval;
this.scheduler =
b.scheduler != null
? b.scheduler
: Executors.newScheduledThreadPool(
1,
r ->
Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r));
if (b.scheduler != null) {
this.scheduler = b.scheduler;
this.ownedScheduler = false;
} else {
this.scheduler =
Executors.newScheduledThreadPool(
1,
r -> Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r));
this.ownedScheduler = true;
}
this.resumeToken = b.resumeToken;
this.lastEventSeq = b.lastEventSeq;
}
Expand All @@ -130,11 +148,16 @@ public CompletableFuture<Session> connect() {
return sessionFuture;
}

public Session connect(Duration timeout) throws InterruptedException, TimeoutException {
public Session connect(Duration timeout)
throws InterruptedException, TimeoutException, ArcpException {
try {
return connect().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (java.util.concurrent.ExecutionException e) {
throw new IllegalStateException("connect failed", e.getCause());
Throwable cause = e.getCause() != null ? e.getCause() : e;
if (cause instanceof ArcpException ax) {
throw ax;
}
throw new IllegalStateException("connect failed", cause);
}
}

Expand All @@ -144,40 +167,102 @@ public JobHandle submit(JobSubmit submit) {

public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
Outstanding o = new Outstanding();
// Pre-register the outstanding job under a request-id keyed slot so that
// when JobAccepted arrives we associate the job_id.
MessageId requestId = MessageId.generate();
pendingSubmits.put(requestId, o);
send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId);
// The put-then-send pair must be atomic w.r.t. other submits so that the
// FIFO order of pendingSubmits matches the wire order observed by the
// runtime; that ordering is how handleAccepted correlates JobAccepted
// back to the right pending submit (the runtime does not echo our
// request id on job.accepted).
submitLock.lock();
try {
pendingSubmits.add(new PendingSubmit(requestId, o));
send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId);
} finally {
submitLock.unlock();
}
return o.handleFuture.join();
}

public Page<JobSummary> listJobs(@Nullable JobFilter filter) {
SessionListJobs req = new SessionListJobs(filter, null, null);
public Page<JobSummary> listJobs(@Nullable JobFilter filter)
throws InterruptedException, TimeoutException, ArcpException {
return listJobs(filter, null, null);
}

/**
* List jobs with optional pagination. Supply {@code cursor} from the previous {@link Page} to
* continue, or {@code null} to fetch the first page. {@code limit} caps the page size.
*/
public Page<JobSummary> listJobs(
@Nullable JobFilter filter, @Nullable Integer limit, @Nullable String cursor)
throws InterruptedException, TimeoutException, ArcpException {
SessionListJobs req = new SessionListJobs(filter, limit, cursor);
MessageId reqId = MessageId.generate();
CompletableFuture<SessionJobs> fut = new CompletableFuture<>();
listRequests.put(reqId, fut);
send(Message.Type.SESSION_LIST_JOBS, req, sessionId, null, null, null, reqId);
try {
SessionJobs response = fut.get(10, TimeUnit.SECONDS);
return new Page<>(response.jobs(), response.nextCursor());
} catch (java.util.concurrent.ExecutionException | InterruptedException | TimeoutException e) {
throw new RuntimeException("list_jobs failed", e);
} catch (InterruptedException e) {
listRequests.remove(reqId);
Thread.currentThread().interrupt();
throw e;
} catch (TimeoutException e) {
listRequests.remove(reqId);
throw e;
} catch (java.util.concurrent.ExecutionException e) {
listRequests.remove(reqId);
Throwable cause = e.getCause() != null ? e.getCause() : e;
if (cause instanceof ArcpException ax) {
throw ax;
}
throw new IllegalStateException("list_jobs failed", cause);
}
}

public Flow.Publisher<EventBody> subscribe(JobId jobId, SubscribeOptions options) {
java.util.concurrent.atomic.AtomicBoolean inserted =
new java.util.concurrent.atomic.AtomicBoolean(false);
SubmissionPublisher<EventBody> pub =
liveSubscribers.computeIfAbsent(
jobId,
k -> new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024));
JobSubscribe sub =
new JobSubscribe(
jobId, options.history() ? options.fromEventSeq() : null, options.history());
send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null);
k -> {
inserted.set(true);
ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
liveExecutors.put(jobId, exec);
return new SubmissionPublisher<>(exec, 1024);
});
if (inserted.get()) {
JobSubscribe sub =
new JobSubscribe(
jobId, options.history() ? options.fromEventSeq() : null, options.history());
send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null);
}
return pub;
}

/**
* Locally unsubscribe from job events and notify the runtime via {@code job.unsubscribe}. Closes
* the local {@link Flow.Publisher} so any downstream subscribers see {@code onComplete}.
*/
public void unsubscribe(JobId jobId) {
SubmissionPublisher<EventBody> pub = liveSubscribers.remove(jobId);
if (pub != null) {
pub.close();
}
ExecutorService exec = liveExecutors.remove(jobId);
if (exec != null) {
exec.shutdown();
}
if (!closed) {
try {
send(Message.Type.JOB_UNSUBSCRIBE, new JobUnsubscribe(jobId), sessionId, null, jobId, null);
} catch (RuntimeException ignored) {
// best-effort
}
}
}

public void ack(long lastProcessedSeq) {
send(Message.Type.SESSION_ACK, new SessionAck(lastProcessedSeq), sessionId, null, null, null);
}
Expand All @@ -204,12 +289,18 @@ public void close() {
for (var pub : liveSubscribers.values()) {
pub.close();
}
for (ExecutorService exec : liveExecutors.values()) {
exec.shutdown();
}
liveExecutors.clear();
try {
transport.close();
} catch (RuntimeException ignored) {
// best-effort close
}
scheduler.shutdownNow();
if (ownedScheduler) {
scheduler.shutdownNow();
}
}

/** Returns the highest event sequence number seen from the server, or -1 if none. */
Expand Down Expand Up @@ -248,17 +339,53 @@ public void onNext(Envelope envelope) {

@Override
public void onError(Throwable throwable) {
sessionFuture.completeExceptionally(throwable);
failAll(throwable);
}

@Override
public void onComplete() {
if (!sessionFuture.isDone()) {
sessionFuture.completeExceptionally(
new IllegalStateException("transport closed before welcome"));
failAll(new IllegalStateException("transport closed before welcome"));
} else {
// Transport closed after welcome — fail anything still in flight.
failAll(new IllegalStateException("transport closed"));
}
}

/**
* Fail every outstanding future and complete every live subscription publisher exceptionally.
* Called on transport error, unexpected transport close, and heartbeat-loss close so a caller
* blocked on {@link #submit}, {@link #listJobs}, or {@link JobHandle#result()} does not wait
* forever.
*/
private void failAll(Throwable cause) {
if (!sessionFuture.isDone()) {
sessionFuture.completeExceptionally(cause);
}
for (PendingSubmit head; (head = pendingSubmits.pollFirst()) != null; ) {
head.outstanding().handleFuture.completeExceptionally(cause);
}
for (java.util.Map.Entry<JobId, Outstanding> e : outstanding.entrySet()) {
Outstanding o = e.getValue();
if (!o.resultFuture.isDone()) {
o.resultFuture.completeExceptionally(cause);
}
o.events.close();
}
outstanding.clear();
for (java.util.Map.Entry<MessageId, CompletableFuture<SessionJobs>> e :
listRequests.entrySet()) {
if (!e.getValue().isDone()) {
e.getValue().completeExceptionally(cause);
}
}
listRequests.clear();
for (SubmissionPublisher<EventBody> pub : liveSubscribers.values()) {
pub.closeExceptionally(cause);
}
liveSubscribers.clear();
}

private void dispatch(Envelope envelope) {
Message m;
try {
Expand Down Expand Up @@ -338,25 +465,23 @@ private void watchHeartbeat(long intervalMs) {
long elapsed = System.currentTimeMillis() - lastInboundMillis.get();
if (elapsed > intervalMs * 2) {
log.info("client observed heartbeat loss; closing session");
failAll(new IllegalStateException("heartbeat lost"));
close();
}
}

private final ConcurrentHashMap<MessageId, Outstanding> pendingSubmits =
new ConcurrentHashMap<>();
private record PendingSubmit(MessageId requestId, Outstanding outstanding) {}

private final ConcurrentLinkedDeque<PendingSubmit> pendingSubmits = new ConcurrentLinkedDeque<>();
private final java.util.concurrent.locks.ReentrantLock submitLock =
new java.util.concurrent.locks.ReentrantLock();

private void handleAccepted(Envelope envelope, JobAccepted accepted) {
// We associate by traversing pending submits in insertion order; the
// runtime guarantees ordering per-session, so the oldest pending submit
// is the one being acknowledged.
MessageId match = pendingSubmits.keySet().stream().findFirst().orElse(null);
if (match == null) {
return;
}
Outstanding o = pendingSubmits.remove(match);
if (o == null) {
PendingSubmit head = pendingSubmits.pollFirst();
if (head == null) {
return;
}
Outstanding o = head.outstanding();
o.jobId = accepted.jobId();
outstanding.put(accepted.jobId(), o);
o.handleFuture.complete(new ClientJobHandle(accepted, o));
Expand Down Expand Up @@ -395,14 +520,11 @@ private void handleError(Envelope envelope, JobError err) {
JobId jid = envelope.jobId();
Outstanding o = jid != null ? outstanding.remove(jid) : null;
if (o == null) {
// Top-level (unassigned) error: drop the oldest pending submit.
MessageId first = pendingSubmits.keySet().stream().findFirst().orElse(null);
if (first != null) {
Outstanding pending = pendingSubmits.remove(first);
if (pending != null) {
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
pending.handleFuture.completeExceptionally(ex);
}
// Top-level (unassigned) error: fail the oldest pending submit.
PendingSubmit head = pendingSubmits.pollFirst();
if (head != null) {
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
head.outstanding().handleFuture.completeExceptionally(ex);
}
return;
}
Expand Down Expand Up @@ -536,7 +658,7 @@ public Builder bearer(String token) {
}

public Builder features(Set<Feature> features) {
this.features = EnumSet.copyOf(features);
this.features = safeFeatureCopy(features);
return this;
}

Expand Down
4 changes: 4 additions & 0 deletions arcp-client/src/main/java/dev/arcp/client/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import org.jspecify.annotations.Nullable;

public record Page<T>(List<T> items, @Nullable String nextCursor) {
public Page {
items = items == null ? List.of() : List.copyOf(items);
}

public static Page<JobSummary> empty() {
return new Page<>(List.of(), null);
}
Expand Down
Loading
Loading