Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public CompletableResultCode shutdown() {
scheduler.shutdown();
try {
scheduler.awaitTermination(5, TimeUnit.SECONDS);
// Wait for any in-flight export to complete before performing the final collection.
// Without this, doRun() sees exportAvailable=false and drops the final metrics.
scheduled.flushInProgress.join(5, TimeUnit.SECONDS);
CompletableResultCode flushResult = scheduled.doRun();
flushResult.join(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -177,6 +180,7 @@ void start() {
private final class Scheduled implements Runnable {

private final AtomicBoolean exportAvailable = new AtomicBoolean(true);
private volatile CompletableResultCode flushInProgress = CompletableResultCode.ofSuccess();

private MetricReaderInstrumentation instrumentation =
new MetricReaderInstrumentation(COMPONENT_ID, MeterProvider.noop());
Expand All @@ -197,6 +201,7 @@ public void run() {
CompletableResultCode doRun() {
CompletableResultCode flushResult = new CompletableResultCode();
if (exportAvailable.compareAndSet(true, false)) {
flushInProgress = flushResult;
try {
long startNanoTime = CLOCK.nanoTime();
String error = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -235,6 +237,85 @@ void close_CallsShutdown() throws IOException {
verify(reader, times(1)).shutdown();
}

@Test
@Timeout(10)
void shutdown_whileExportInFlight_waitsThenPerformsFinalExport() throws Exception {
CompletableResultCode inflightExportResult = new CompletableResultCode();
CountDownLatch exportStarted = new CountDownLatch(1);
AtomicInteger exportCount = new AtomicInteger();
AtomicBoolean shutdownCalledWhileExportPending = new AtomicBoolean();

MetricExporter blockingExporter =
new MetricExporter() {
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (exportCount.incrementAndGet() == 1) {
exportStarted.countDown();
return inflightExportResult;
}
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
if (!inflightExportResult.isDone()) {
shutdownCalledWhileExportPending.set(true);
}
return CompletableResultCode.ofSuccess();
}
};

PeriodicMetricReader reader =
PeriodicMetricReader.builder(blockingExporter)
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
reader.register(collectionRegistration);

// Trigger an export that blocks
CompletableResultCode flushResult = reader.forceFlush();
assertThat(exportStarted.await(5, TimeUnit.SECONDS)).isTrue();

// Shutdown in background — should block waiting for in-flight export
CountDownLatch shutdownDone = new CountDownLatch(1);
Thread shutdownThread =
new Thread(
() -> {
reader.shutdown();
shutdownDone.countDown();
});
shutdownThread.setDaemon(true);
shutdownThread.start();

// Give shutdown() time to reach the flushInProgress.join() wait.
// Even if this executes before shutdown enters the wait, the assertions below still
// validate correctness — they just won't exercise the concurrent case.
Thread.sleep(200);

// Release the in-flight export
inflightExportResult.succeed();

// Shutdown completes
assertThat(shutdownDone.await(5, TimeUnit.SECONDS)).isTrue();

// In-flight export succeeded
flushResult.join(5, TimeUnit.SECONDS);
assertThat(flushResult.isSuccess()).isTrue();
// Final shutdown export also ran (in-flight + final = 2)
assertThat(exportCount.get()).isEqualTo(2);
// Exporter.shutdown() was not called while the in-flight export was still pending
assertThat(shutdownCalledWhileExportPending.get()).isFalse();
}

@Test
@SuppressWarnings("PreferJavaTimeOverload") // Testing the overload
void invalidConfig() {
Expand Down
Loading