From 5d499ee04ceff86564d31c8499ec573f8bc11479 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 15 Apr 2026 16:48:23 -0700 Subject: [PATCH] Fix PeriodicMetricReader shutdown race losing final metrics When a periodic export is in-flight during shutdown, doRun() sees exportAvailable=false and drops the final collection. Then exporter.shutdown() cancels the in-flight export via cancelAll(). Fix: wait for any in-flight flush to complete before the final doRun() in shutdown(). Track the in-flight flush via a volatile flushInProgress field set at the start of each doRun() cycle. --- .../metrics/export/PeriodicMetricReader.java | 5 ++ .../export/PeriodicMetricReaderTest.java | 81 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index fe0d87bce94..345a8b11e3f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -117,6 +117,9 @@ public CompletableResultCode shutdown() { scheduler.shutdown(); try { scheduler.awaitTermination(5, TimeUnit.SECONDS); + // Wait for any in-flight export to complete before performing the final collection. + // Without this, doRun() sees exportAvailable=false and drops the final metrics. + scheduled.flushInProgress.join(5, TimeUnit.SECONDS); CompletableResultCode flushResult = scheduled.doRun(); flushResult.join(5, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -177,6 +180,7 @@ void start() { private final class Scheduled implements Runnable { private final AtomicBoolean exportAvailable = new AtomicBoolean(true); + private volatile CompletableResultCode flushInProgress = CompletableResultCode.ofSuccess(); private MetricReaderInstrumentation instrumentation = new MetricReaderInstrumentation(COMPONENT_ID, MeterProvider.noop()); @@ -197,6 +201,7 @@ public void run() { CompletableResultCode doRun() { CompletableResultCode flushResult = new CompletableResultCode(); if (exportAvailable.compareAndSet(true, false)) { + flushInProgress = flushResult; try { long startNanoTime = CLOCK.nanoTime(); String error = null; diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index eb7336e579e..1e74ffcaa9e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -34,11 +34,13 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -235,6 +237,85 @@ void close_CallsShutdown() throws IOException { verify(reader, times(1)).shutdown(); } + @Test + @Timeout(10) + void shutdown_whileExportInFlight_waitsThenPerformsFinalExport() throws Exception { + CompletableResultCode inflightExportResult = new CompletableResultCode(); + CountDownLatch exportStarted = new CountDownLatch(1); + AtomicInteger exportCount = new AtomicInteger(); + AtomicBoolean shutdownCalledWhileExportPending = new AtomicBoolean(); + + MetricExporter blockingExporter = + new MetricExporter() { + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return AggregationTemporality.CUMULATIVE; + } + + @Override + public CompletableResultCode export(Collection metrics) { + if (exportCount.incrementAndGet() == 1) { + exportStarted.countDown(); + return inflightExportResult; + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + if (!inflightExportResult.isDone()) { + shutdownCalledWhileExportPending.set(true); + } + return CompletableResultCode.ofSuccess(); + } + }; + + PeriodicMetricReader reader = + PeriodicMetricReader.builder(blockingExporter) + .setInterval(Duration.ofSeconds(Integer.MAX_VALUE)) + .build(); + reader.register(collectionRegistration); + + // Trigger an export that blocks + CompletableResultCode flushResult = reader.forceFlush(); + assertThat(exportStarted.await(5, TimeUnit.SECONDS)).isTrue(); + + // Shutdown in background — should block waiting for in-flight export + CountDownLatch shutdownDone = new CountDownLatch(1); + Thread shutdownThread = + new Thread( + () -> { + reader.shutdown(); + shutdownDone.countDown(); + }); + shutdownThread.setDaemon(true); + shutdownThread.start(); + + // Give shutdown() time to reach the flushInProgress.join() wait. + // Even if this executes before shutdown enters the wait, the assertions below still + // validate correctness — they just won't exercise the concurrent case. + Thread.sleep(200); + + // Release the in-flight export + inflightExportResult.succeed(); + + // Shutdown completes + assertThat(shutdownDone.await(5, TimeUnit.SECONDS)).isTrue(); + + // In-flight export succeeded + flushResult.join(5, TimeUnit.SECONDS); + assertThat(flushResult.isSuccess()).isTrue(); + // Final shutdown export also ran (in-flight + final = 2) + assertThat(exportCount.get()).isEqualTo(2); + // Exporter.shutdown() was not called while the in-flight export was still pending + assertThat(shutdownCalledWhileExportPending.get()).isFalse(); + } + @Test @SuppressWarnings("PreferJavaTimeOverload") // Testing the overload void invalidConfig() {