From 08deab0d782ca5bb1cac12fe2f0975a85134c1b2 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Mon, 13 Apr 2026 20:35:54 +0000 Subject: [PATCH 01/18] WIP: Add metrics batcher in the SDK --- .../metrics/export/MetricExportBatcher.java | 219 ++++++++++++++++++ .../metrics/export/PeriodicMetricReader.java | 9 +- .../export/PeriodicMetricReaderBuilder.java | 10 +- 3 files changed, 236 insertions(+), 2 deletions(-) create mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java new file mode 100644 index 00000000000..9c8731ef03a --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -0,0 +1,219 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.export; + +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +class MetricExportBatcher { + private final int maxExportBatchSize; + + MetricExportBatcher(int maxExportBatchSize) { + this.maxExportBatchSize = maxExportBatchSize; + } + + /** + * Batches the given metric data into multiple batches based on the maximum export batch size. + * + * @param metrics The collection of metric data objects to batch based on the number of data + * points they contain. + * @return A collection of batches of metric data. + */ + Collection> batchMetrics(Collection metrics) { + if (metrics.isEmpty()) { + return Collections.emptyList(); + } + + Collection> batches = new ArrayList<>(); + int currentBatchRemainingCapacity = maxExportBatchSize; + + for (MetricData metricData : metrics) { + MetricDataSplitOperationResult splitResult = + splitMetricData(metricData, currentBatchRemainingCapacity); + batches.add(splitResult.getBatchedMetricData()); + currentBatchRemainingCapacity = splitResult.getLastBatchRemainingCapacity(); + } + + return Collections.unmodifiableCollection(batches); + } + + /** + * Splits a MetricData object into multiple MetricData objects if the number of points exceeds the + * remaining capacity in the current batch. This function tries to fill the current batch with as + * many points as possible from the given metric data. + * + *

If the number of points in the metric data is less than or equal to the remaining capacity + * in the current batch, it will return a single MetricData object with all the points. + * + *

If the number of points in the metric data is greater than the remaining capacity in the + * current batch, it will return multiple MetricData objects, each with a subset of the points + * from the original metric data. + * + * @param metricData The MetricData object to split. + * @param remainingCapacityInCurrentBatch The remaining capacity in the current batch being used. + * @return A MetricDataSplitOperationResult containing the batched metric data and the remaining + * capacity in the last batch. + */ + private MetricDataSplitOperationResult splitMetricData( + MetricData metricData, int remainingCapacityInCurrentBatch) { + int totalPointsInMetricData = metricData.getData().getPoints().size(); + if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { + // We have enough capacity in the current batch to fit all points in this + // MetricData + return new MetricDataSplitOperationResult( + Collections.singleton(metricData), + remainingCapacityInCurrentBatch - totalPointsInMetricData); + } else { + // We don't have enough capacity in the current batch. Split this MetricData + // into multiple MetricData objects. + Collection splittedMetrics = new ArrayList<>(); + // List of all points in the metric data - to avoid creating a new one in each + // call to copyMetricData + List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); + + // Split the points into chunks of size maxExportBatchSize + // From the first chunk, take as many points as possible to fill current batch + int pointsToTake = remainingCapacityInCurrentBatch; + int currentIndex = 0; + + if (pointsToTake > 0) { + splittedMetrics.add( + copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentIndex = pointsToTake; + remainingCapacityInCurrentBatch -= pointsToTake; // should be 0 + } + + int remainingPoints = totalPointsInMetricData - currentIndex; + // Add remaining points in chunks of size maxExportBatchSize + while (currentIndex < totalPointsInMetricData) { + pointsToTake = Math.min(remainingPoints, maxExportBatchSize); + splittedMetrics.add( + copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentIndex += pointsToTake; + remainingPoints -= pointsToTake; + } + + int lastBatchRemainingCapacity = maxExportBatchSize - pointsToTake; + return new MetricDataSplitOperationResult(splittedMetrics, lastBatchRemainingCapacity); + } + } + + private static MetricData copyMetricData( + MetricData original, + List originalPointsList, + int dataPointsOffset, + int dataPointsToTake) { + List points = + originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake); + return createMetricDataWithPoints(original, points); + } + + /** + * Creates a new MetricData with the given points. + * + * @param original The original MetricData. + * @param points The points to use for the new MetricData. + * @return A new MetricData with the given points. + */ + @SuppressWarnings("unchecked") + private static MetricData createMetricDataWithPoints( + MetricData original, Collection points) { + switch (original.getType()) { + case DOUBLE_GAUGE: + return ImmutableMetricData.createDoubleGauge( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableGaugeData.create((Collection) (Collection) points)); + case LONG_GAUGE: + return ImmutableMetricData.createLongGauge( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableGaugeData.create((Collection) (Collection) points)); + case DOUBLE_SUM: + SumData doubleSumData = original.getDoubleSumData(); + return ImmutableMetricData.createDoubleSum( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableSumData.create( + doubleSumData.isMonotonic(), + doubleSumData.getAggregationTemporality(), + (Collection) (Collection) points)); + case LONG_SUM: + SumData longSumData = original.getLongSumData(); + return ImmutableMetricData.createLongSum( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableSumData.create( + longSumData.isMonotonic(), + longSumData.getAggregationTemporality(), + (Collection) (Collection) points)); + case HISTOGRAM: + HistogramData histogramData = original.getHistogramData(); + return ImmutableMetricData.createDoubleHistogram( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableHistogramData.create( + histogramData.getAggregationTemporality(), + (Collection) (Collection) points)); + default: + throw new UnsupportedOperationException("Unsupported metric type: " + original.getType()); + } + } + + /** A result of a metric data split operation. */ + private static class MetricDataSplitOperationResult { + private final Collection batchedMetricData; + private final int lastBatchRemainingCapacity; + + /** + * Creates a new MetricDataSplitOperationResult. + * + * @param batchedMetricData The collection of batched metric data. + * @param lastBatchRemainingCapacity The remaining capacity in the last batch. + */ + MetricDataSplitOperationResult( + Collection batchedMetricData, int lastBatchRemainingCapacity) { + this.batchedMetricData = batchedMetricData; + this.lastBatchRemainingCapacity = lastBatchRemainingCapacity; + } + + Collection getBatchedMetricData() { + return batchedMetricData; + } + + int getLastBatchRemainingCapacity() { + return lastBatchRemainingCapacity; + } + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index fe0d87bce94..a28b551b7d5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -51,6 +51,7 @@ public final class PeriodicMetricReader implements MetricReader { private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop(); @Nullable private volatile ScheduledFuture scheduledFuture; + @Nullable private final MetricExportBatcher metricsBatcher; /** * Returns a new {@link PeriodicMetricReader} which exports to the {@code exporter} once every @@ -66,10 +67,14 @@ public static PeriodicMetricReaderBuilder builder(MetricExporter exporter) { } PeriodicMetricReader( - MetricExporter exporter, long intervalNanos, ScheduledExecutorService scheduler) { + MetricExporter exporter, + long intervalNanos, + ScheduledExecutorService scheduler, + @Nullable MetricExportBatcher metricsBatcher) { this.exporter = exporter; this.intervalNanos = intervalNanos; this.scheduler = scheduler; + this.metricsBatcher = metricsBatcher; this.scheduled = new Scheduled(); } @@ -160,6 +165,8 @@ public String toString() { + exporter + ", intervalNanos=" + intervalNanos + + ", metricsBatcher=" + + metricsBatcher + '}'; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java index 04cdd27506d..df4d387caef 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java @@ -30,6 +30,8 @@ public final class PeriodicMetricReaderBuilder { @Nullable private ScheduledExecutorService executor; + @Nullable private MetricExportBatcher metricsBatcher; + PeriodicMetricReaderBuilder(MetricExporter metricExporter) { this.metricExporter = metricExporter; } @@ -59,6 +61,12 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor return this; } + public PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) { + checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive"); + this.metricsBatcher = new MetricExportBatcher(maxExportBatchSize); + return this; + } + /** Build a {@link PeriodicMetricReader} with the configuration of this builder. */ public PeriodicMetricReader build() { ScheduledExecutorService executor = this.executor; @@ -66,6 +74,6 @@ public PeriodicMetricReader build() { executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("PeriodicMetricReader")); } - return new PeriodicMetricReader(metricExporter, intervalNanos, executor); + return new PeriodicMetricReader(metricExporter, intervalNanos, executor, metricsBatcher); } } From 431af24f6959b34af2a420b8ae83335fb6506ea5 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Tue, 14 Apr 2026 15:15:38 +0000 Subject: [PATCH 02/18] Allow exporting metricData batches --- .../sdk/metrics/export/PeriodicMetricReader.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index a28b551b7d5..7a0e3b2f0b3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -16,7 +16,9 @@ import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -219,7 +221,18 @@ CompletableResultCode doRun() { flushResult.succeed(); exportAvailable.set(true); } else { - CompletableResultCode result = exporter.export(metricData); + Collection> batches = null; + CompletableResultCode result; + if (metricsBatcher != null) { + batches = metricsBatcher.batchMetrics(metricData); + List results = new ArrayList<>(batches.size()); + for (Collection batch : batches) { + results.add(exporter.export(batch)); + } + result = CompletableResultCode.ofAll(results); + } else { + result = exporter.export(metricData); + } result.whenComplete( () -> { if (!result.isSuccess()) { From 986fd6d8ea268d0c9826b0c5adb2847a64656a09 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Tue, 14 Apr 2026 15:32:32 +0000 Subject: [PATCH 03/18] Make existing tests compatible with changes --- .../sdk/metrics/export/MetricExportBatcher.java | 5 +++++ .../sdk/metrics/export/PeriodicMetricReaderTest.java | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 9c8731ef03a..288d0b3c3a3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -28,6 +28,11 @@ class MetricExportBatcher { this.maxExportBatchSize = maxExportBatchSize; } + @Override + public String toString() { + return "MetricExportBatcher{maxExportBatchSize=" + maxExportBatchSize + "}"; + } + /** * Batches the given metric data into multiple batches based on the maximum export batch size. * diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index eb7336e579e..ca49bae93c4 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -260,12 +260,14 @@ void stringRepresentation() { assertThat( PeriodicMetricReader.builder(metricExporter) .setInterval(Duration.ofSeconds(1)) + .setMaxExportBatchSize(200) .build() .toString()) .isEqualTo( "PeriodicMetricReader{" + "exporter=MockMetricExporter{}, " - + "intervalNanos=1000000000" + + "intervalNanos=1000000000, " + + "metricsBatcher=MetricExportBatcher{maxExportBatchSize=200}" + "}"); } From 78241ed3f6aa983e636af283a4e26bb22e745678 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Tue, 14 Apr 2026 19:40:25 +0000 Subject: [PATCH 04/18] Add docs and update existing tests --- .../sdk/metrics/export/MetricExportBatcher.java | 16 ++++++++++++++++ .../metrics/export/PeriodicMetricReaderTest.java | 13 +++++++++++++ 2 files changed, 29 insertions(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 288d0b3c3a3..00b0ba6da24 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.metrics.export; +import io.opentelemetry.sdk.metrics.data.Data; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.HistogramData; import io.opentelemetry.sdk.metrics.data.HistogramPointData; @@ -21,10 +22,25 @@ import java.util.Collections; import java.util.List; +/** + * Batches metric data into multiple batches based on the maximum export batch size. This is used by + * the {@link PeriodicMetricReader} to batch metric data before exporting it. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ class MetricExportBatcher { private final int maxExportBatchSize; + /** + * Creates a new {@link MetricExportBatcher} with the given maximum export batch size. + * + * @param maxExportBatchSize The maximum number of {@link Data#getPoints()} in each export. + */ MetricExportBatcher(int maxExportBatchSize) { + if (maxExportBatchSize <= 0) { + throw new IllegalArgumentException("maxExportBatchSize must be positive"); + } this.maxExportBatchSize = maxExportBatchSize; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index ca49bae93c4..f109af57510 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -95,6 +95,19 @@ void startOnlyOnce() { verify(scheduler, times(1)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any()); } + @Test + void build_withIllegalMaxExportSize() { + assertThatThrownBy( + () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + + assertThatThrownBy( + () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(-1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + } + @Test void periodicExport() throws Exception { WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); From 2543a51bb4c2da9a363ce52684a77a3e3eee7466 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 00:31:33 +0000 Subject: [PATCH 05/18] Fix MetricExportBatcher logic and add tests --- .../metrics/export/MetricExportBatcher.java | 131 ++++--- .../export/MetricExportBatcherTest.java | 367 ++++++++++++++++++ 2 files changed, 445 insertions(+), 53 deletions(-) create mode 100644 sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 00b0ba6da24..c71565bd055 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -61,77 +61,91 @@ Collection> batchMetrics(Collection metrics) return Collections.emptyList(); } - Collection> batches = new ArrayList<>(); - int currentBatchRemainingCapacity = maxExportBatchSize; + Collection> preparedBatchesForExport = new ArrayList<>(); + Collection currentBatch = new ArrayList<>(maxExportBatchSize); + // Iterate through each MetricData and fill up the current batch, splitting if + // necessary for (MetricData metricData : metrics) { - MetricDataSplitOperationResult splitResult = - splitMetricData(metricData, currentBatchRemainingCapacity); - batches.add(splitResult.getBatchedMetricData()); - currentBatchRemainingCapacity = splitResult.getLastBatchRemainingCapacity(); + MetricDataSplitOperationResult splitResult = prepareExportBatches(metricData, currentBatch); + preparedBatchesForExport.addAll(splitResult.getPreparedBatches()); + currentBatch = splitResult.getLastInProgressBatch(); } - return Collections.unmodifiableCollection(batches); + // Add the last in-progress batch if it is not empty + if (!currentBatch.isEmpty()) { + preparedBatchesForExport.add(currentBatch); + } + + return Collections.unmodifiableCollection(preparedBatchesForExport); } /** - * Splits a MetricData object into multiple MetricData objects if the number of points exceeds the - * remaining capacity in the current batch. This function tries to fill the current batch with as - * many points as possible from the given metric data. - * - *

If the number of points in the metric data is less than or equal to the remaining capacity - * in the current batch, it will return a single MetricData object with all the points. + * Prepares export batches from a single metric data object. This function only + * operates on a + * single metric data object, fills up the current batch with as many points as + * possible from the + * metric data object, and then creates new metric data objects for the + * remaining points. * - *

If the number of points in the metric data is greater than the remaining capacity in the - * current batch, it will return multiple MetricData objects, each with a subset of the points - * from the original metric data. - * - * @param metricData The MetricData object to split. - * @param remainingCapacityInCurrentBatch The remaining capacity in the current batch being used. - * @return A MetricDataSplitOperationResult containing the batched metric data and the remaining - * capacity in the last batch. + * @param metricData The metric data object to split. + * @param currentBatch The current batch of metric data objects. + * @return A result containing the prepared batches and the last in-progress + * batch. */ - private MetricDataSplitOperationResult splitMetricData( - MetricData metricData, int remainingCapacityInCurrentBatch) { + private MetricDataSplitOperationResult prepareExportBatches( + MetricData metricData, Collection currentBatch) { + int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatch.size(); int totalPointsInMetricData = metricData.getData().getPoints().size(); + if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { - // We have enough capacity in the current batch to fit all points in this - // MetricData - return new MetricDataSplitOperationResult( - Collections.singleton(metricData), - remainingCapacityInCurrentBatch - totalPointsInMetricData); + currentBatch.add(metricData); + return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch); } else { - // We don't have enough capacity in the current batch. Split this MetricData - // into multiple MetricData objects. - Collection splittedMetrics = new ArrayList<>(); - // List of all points in the metric data - to avoid creating a new one in each - // call to copyMetricData + // remaining capacity in current batch cannot hold all points from metric data + // split the metric data into multiple metric data objects List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); + Collection> preparedBatches = new ArrayList<>(); // Split the points into chunks of size maxExportBatchSize // From the first chunk, take as many points as possible to fill current batch int pointsToTake = remainingCapacityInCurrentBatch; int currentIndex = 0; + // fill the current batch and add it to prepared batches if (pointsToTake > 0) { - splittedMetrics.add( + currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); currentIndex = pointsToTake; - remainingCapacityInCurrentBatch -= pointsToTake; // should be 0 + preparedBatches.add(currentBatch); } + // If the current metric contains more data points than could fit into the + // filled batch above, + // we initialize a fresh batch to receive the spillover points on subsequent + // iterations. int remainingPoints = totalPointsInMetricData - currentIndex; + currentBatch = new ArrayList<>(maxExportBatchSize); + remainingCapacityInCurrentBatch = maxExportBatchSize; + // Add remaining points in chunks of size maxExportBatchSize - while (currentIndex < totalPointsInMetricData) { - pointsToTake = Math.min(remainingPoints, maxExportBatchSize); - splittedMetrics.add( + while (currentIndex < totalPointsInMetricData && remainingPoints > 0) { + // There are still more points in the current metricData + // Take as many points as possible to fill current batch up till remaining + // capacity + pointsToTake = Math.min(remainingPoints, remainingCapacityInCurrentBatch); + currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); currentIndex += pointsToTake; remainingPoints -= pointsToTake; + remainingCapacityInCurrentBatch -= pointsToTake; + if (remainingCapacityInCurrentBatch == 0) { + preparedBatches.add(currentBatch); + currentBatch = new ArrayList<>(maxExportBatchSize); + remainingCapacityInCurrentBatch = maxExportBatchSize; + } } - - int lastBatchRemainingCapacity = maxExportBatchSize - pointsToTake; - return new MetricDataSplitOperationResult(splittedMetrics, lastBatchRemainingCapacity); + return new MetricDataSplitOperationResult(preparedBatches, currentBatch); } } @@ -212,29 +226,40 @@ private static MetricData createMetricDataWithPoints( } } - /** A result of a metric data split operation. */ + /** + * A data class to store the result of a split operation performed on a single + * {@link MetricData} + * object. + */ private static class MetricDataSplitOperationResult { - private final Collection batchedMetricData; - private final int lastBatchRemainingCapacity; + private final Collection> preparedBatches; + private final Collection lastInProgressBatch; /** * Creates a new MetricDataSplitOperationResult. * - * @param batchedMetricData The collection of batched metric data. - * @param lastBatchRemainingCapacity The remaining capacity in the last batch. + * @param preparedBatches The collection of prepared batches of metric data + * for export. Each + * batch of {@link MetricData} objects is guaranteed + * to have at most {@link + * #maxExportBatchSize} points. + * @param lastInProgressBatch The last batch that is still in progress. This + * batch may have less + * than {@link #maxExportBatchSize} points. */ MetricDataSplitOperationResult( - Collection batchedMetricData, int lastBatchRemainingCapacity) { - this.batchedMetricData = batchedMetricData; - this.lastBatchRemainingCapacity = lastBatchRemainingCapacity; + Collection> preparedBatches, + Collection lastInProgressBatch) { + this.preparedBatches = preparedBatches; + this.lastInProgressBatch = lastInProgressBatch; } - Collection getBatchedMetricData() { - return batchedMetricData; + Collection> getPreparedBatches() { + return preparedBatches; } - int getLastBatchRemainingCapacity() { - return lastBatchRemainingCapacity; + Collection getLastInProgressBatch() { + return lastInProgressBatch; } } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java new file mode 100644 index 00000000000..b4f6a2abf0b --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -0,0 +1,367 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.export; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; + +class MetricExportBatcherTest { + + @Test + void constructor_InvalidMaxExportBatchSize() { + assertThatThrownBy(() -> new MetricExportBatcher(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + assertThatThrownBy(() -> new MetricExportBatcher(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + } + + @Test + void toString_Valid() { + MetricExportBatcher batcher = new MetricExportBatcher(10); + assertThat(batcher.toString()).isEqualTo("MetricExportBatcher{maxExportBatchSize=10}"); + } + + @Test + void batchMetrics_EmptyMetrics() { + MetricExportBatcher batcher = new MetricExportBatcher(10); + assertThat(batcher.batchMetrics(Collections.emptyList())).isEmpty(); + } + + @Test + void batchMetrics_MetricFitsIntact() { + MetricExportBatcher batcher = new MetricExportBatcher(10); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Collections.singletonList(p1))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(metric); + } + + @Test + @SuppressWarnings("all") + void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); + DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0); + DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0); + DoublePointData p4 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 4.0); + DoublePointData p5 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 5.0); + + MetricData metric = + ImmutableMetricData.createDoubleGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + List> batchesList = new ArrayList<>(batches); + + assertThat(batchesList.size()).isEqualTo(3); + Collection firstBatch = batchesList.get(0); + Collection secondBatch = batchesList.get(1); + Collection thirdBatch = batchesList.get(2); + + assertThat(firstBatch.size()).isEqualTo(1); + assertThat(secondBatch.size()).isEqualTo(1); + assertThat(thirdBatch.size()).isEqualTo(1); + + MetricData firsBatch_m1 = firstBatch.iterator().next(); + assertThat(firsBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(firsBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData secondBatch_m1 = secondBatch.iterator().next(); + assertThat(secondBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(secondBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); + + // Last batch is partially filled. + MetricData thirdBatch_m1 = thirdBatch.iterator().next(); + assertThat(thirdBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(thirdBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p5); + } + + @Test + void batchMetrics_SplitsLongGauge_SingleBatchPartiallyFilled() { + MetricExportBatcher batcher = new MetricExportBatcher(4); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(1); + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(1); // There is only 1 MetricData + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3); + } + + @Test + void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); + DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0); + + MetricData metric = + ImmutableMetricData.createDoubleSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableSumData.create( + /* isMonotonic= */ true, AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(1); // There is only 1 MetricData + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1, p2); + assertThat(m1.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m1.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + + MetricData metric_1 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc_1", + "1", + ImmutableSumData.create( + /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); + + MetricData metric_2 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc_2", + "1", + ImmutableSumData.create( + /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Arrays.asList(metric_1, metric_2)); + + assertThat(batches).hasSize(4); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + Collection thirdBatch = batches.stream().skip(2).findFirst().get(); + Collection fourthBatch = batches.stream().skip(3).findFirst().get(); + + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + assertThat(thirdBatch).hasSize(1); + assertThat(fourthBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m1.getName()).isEqualTo("name_1"); + assertThat(m1.getDescription()).isEqualTo("desc_1"); + assertThat(m1.getUnit()).isEqualTo("1"); + assertThat(m1.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m1.getLongSumData().isMonotonic()).isFalse(); + assertThat(m1.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m2.getName()).isEqualTo("name_1"); + assertThat(m2.getDescription()).isEqualTo("desc_1"); + assertThat(m2.getUnit()).isEqualTo("1"); + assertThat(m2.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m2.getLongSumData().isMonotonic()).isFalse(); + assertThat(m2.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + + MetricData m3 = thirdBatch.iterator().next(); + assertThat(m3.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m3.getName()).isEqualTo("name_2"); + assertThat(m3.getDescription()).isEqualTo("desc_2"); + assertThat(m3.getUnit()).isEqualTo("1"); + assertThat(m3.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m3.getLongSumData().isMonotonic()).isFalse(); + assertThat(m3.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + + MetricData m4 = fourthBatch.iterator().next(); + assertThat(m4.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m4.getName()).isEqualTo("name_2"); + assertThat(m4.getDescription()).isEqualTo("desc_2"); + assertThat(m4.getUnit()).isEqualTo("1"); + assertThat(m4.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m4.getLongSumData().isMonotonic()).isFalse(); + assertThat(m4.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + } + + @Test + void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + ImmutableHistogramPointData p1 = + ImmutableHistogramPointData.create( + 1, + 2, + Attributes.empty(), + 1.0, + /* hasMin= */ false, + 0.0, + /* hasMax= */ false, + 0.0, + Collections.emptyList(), + Collections.singletonList(1L)); + ImmutableHistogramPointData p2 = + ImmutableHistogramPointData.create( + 1, + 2, + Attributes.empty(), + 2.0, + /* hasMin= */ false, + 0.0, + /* hasMax= */ false, + 0.0, + Collections.emptyList(), + Collections.singletonList(2L)); + + MetricData metric = + ImmutableMetricData.createDoubleHistogram( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableHistogramData.create( + AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(2); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m1.getHistogramData().getPoints()).containsExactly(p1); + assertThat(m1.getHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m2.getHistogramData().getPoints()).containsExactly(p2); + assertThat(m2.getHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void batchMetrics_EmptyPointsInMetricData() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Collections.emptyList())); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(metric); + } + + @Test + void batchMetrics_MultipleMetricsExactCapacityMatch() { + MetricExportBatcher batcher = new MetricExportBatcher(4); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + + MetricData m1 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2))); + MetricData m2 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p3, p4))); + + Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(m1, m2); + } +} From 6fadc654e4286e4bd12ca3251724c27472fcae31 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 01:15:21 +0000 Subject: [PATCH 06/18] Add support for EXPONENTIAL_HISTOGRAM and SUMMARY data types --- .../metrics/export/MetricExportBatcher.java | 57 +++--- .../export/MetricExportBatcherTest.java | 163 ++++++++++++++++-- 2 files changed, 181 insertions(+), 39 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index c71565bd055..28896181136 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -7,16 +7,21 @@ import io.opentelemetry.sdk.metrics.data.Data; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.data.HistogramData; import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -81,17 +86,13 @@ Collection> batchMetrics(Collection metrics) } /** - * Prepares export batches from a single metric data object. This function only - * operates on a - * single metric data object, fills up the current batch with as many points as - * possible from the - * metric data object, and then creates new metric data objects for the - * remaining points. + * Prepares export batches from a single metric data object. This function only operates on a + * single metric data object, fills up the current batch with as many points as possible from the + * metric data object, and then creates new metric data objects for the remaining points. * - * @param metricData The metric data object to split. + * @param metricData The metric data object to split. * @param currentBatch The current batch of metric data objects. - * @return A result containing the prepared batches and the last in-progress - * batch. + * @return A result containing the prepared batches and the last in-progress batch. */ private MetricDataSplitOperationResult prepareExportBatches( MetricData metricData, Collection currentBatch) { @@ -221,14 +222,31 @@ private static MetricData createMetricDataWithPoints( ImmutableHistogramData.create( histogramData.getAggregationTemporality(), (Collection) (Collection) points)); - default: - throw new UnsupportedOperationException("Unsupported metric type: " + original.getType()); + case EXPONENTIAL_HISTOGRAM: + ExponentialHistogramData expHistogramData = original.getExponentialHistogramData(); + return ImmutableMetricData.createExponentialHistogram( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableExponentialHistogramData.create( + expHistogramData.getAggregationTemporality(), + (Collection) (Collection) points)); + case SUMMARY: + return ImmutableMetricData.createDoubleSummary( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableSummaryData.create((Collection) (Collection) points)); } + throw new UnsupportedOperationException("Unsupported metric type: " + original.getType()); } /** - * A data class to store the result of a split operation performed on a single - * {@link MetricData} + * A data class to store the result of a split operation performed on a single {@link MetricData} * object. */ private static class MetricDataSplitOperationResult { @@ -238,14 +256,11 @@ private static class MetricDataSplitOperationResult { /** * Creates a new MetricDataSplitOperationResult. * - * @param preparedBatches The collection of prepared batches of metric data - * for export. Each - * batch of {@link MetricData} objects is guaranteed - * to have at most {@link - * #maxExportBatchSize} points. - * @param lastInProgressBatch The last batch that is still in progress. This - * batch may have less - * than {@link #maxExportBatchSize} points. + * @param preparedBatches The collection of prepared batches of metric data for export. Each + * batch of {@link MetricData} objects is guaranteed to have at most {@link + * #maxExportBatchSize} points. + * @param lastInProgressBatch The last batch that is still in progress. This batch may have less + * than {@link #maxExportBatchSize} points. */ MetricDataSplitOperationResult( Collection> preparedBatches, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index b4f6a2abf0b..bf80bf56809 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -12,16 +12,25 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.Arrays; @@ -317,24 +326,6 @@ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() .isEqualTo(AggregationTemporality.CUMULATIVE); } - @Test - void batchMetrics_EmptyPointsInMetricData() { - MetricExportBatcher batcher = new MetricExportBatcher(2); - MetricData metric = - ImmutableMetricData.createLongGauge( - Resource.empty(), - InstrumentationScopeInfo.empty(), - "name", - "desc", - "1", - ImmutableGaugeData.create(Collections.emptyList())); - - Collection> batches = - batcher.batchMetrics(Collections.singletonList(metric)); - assertThat(batches).hasSize(1); - assertThat(batches.iterator().next()).containsExactly(metric); - } - @Test void batchMetrics_MultipleMetricsExactCapacityMatch() { MetricExportBatcher batcher = new MetricExportBatcher(4); @@ -364,4 +355,140 @@ void batchMetrics_MultipleMetricsExactCapacityMatch() { assertThat(batches).hasSize(1); assertThat(batches.iterator().next()).containsExactly(m1, m2); } + + @Test + void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + ExponentialHistogramBuckets buckets = + ImmutableExponentialHistogramBuckets.create( + /* scale= */ 20, /* offset= */ 0, /* bucketCounts= */ Collections.singletonList(1L)); + ExponentialHistogramPointData p1 = + ImmutableExponentialHistogramPointData.create( + /* scale= */ 20, + /* sum= */ 1.0, + /* zeroCount= */ 0, + /* hasMin= */ false, + /* min= */ 0.0, + /* hasMax= */ false, + /* max= */ 0.0, + /* positiveBuckets= */ buckets, + /* negativeBuckets= */ buckets, + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* exemplars= */ Collections.emptyList()); + ExponentialHistogramPointData p2 = + ImmutableExponentialHistogramPointData.create( + /* scale= */ 20, + /* sum= */ 2.0, + /* zeroCount= */ 0, + /* hasMin= */ false, + /* min= */ 0.0, + /* hasMax= */ false, + /* max= */ 0.0, + /* positiveBuckets= */ buckets, + /* negativeBuckets= */ buckets, + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* exemplars= */ Collections.emptyList()); + + MetricData metric = + ImmutableMetricData.createExponentialHistogram( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableExponentialHistogramData.create( + AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(2); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m1.getExponentialHistogramData().getPoints()).containsExactly(p1); + assertThat(m1.getExponentialHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m2.getExponentialHistogramData().getPoints()).containsExactly(p2); + assertThat(m2.getExponentialHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void batchMetrics_SplitsSummary_MultipleBatchesCompletelyFilled_SingleMetric() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + SummaryPointData p1 = + ImmutableSummaryPointData.create( + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* count= */ 1, + /* sum= */ 1.0, + /* percentileValues= */ Collections.singletonList( + ImmutableValueAtQuantile.create(0.5, 1.0))); + SummaryPointData p2 = + ImmutableSummaryPointData.create( + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* count= */ 1, + /* sum= */ 2.0, + /* percentileValues= */ Collections.singletonList( + ImmutableValueAtQuantile.create(0.5, 2.0))); + + MetricData metric = + ImmutableMetricData.createDoubleSummary( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableSummaryData.create(Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(2); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m1.getSummaryData().getPoints()).containsExactly(p1); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m2.getSummaryData().getPoints()).containsExactly(p2); + } + + @Test + void batchMetrics_EmptyPointsInMetricData() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Collections.emptyList())); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(metric); + } } From 8153388ab8968863d14915f237e46fe02ff19ccf Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 01:28:13 +0000 Subject: [PATCH 07/18] Fix metrics checkstyle issue --- .../sdk/metrics/export/MetricExportBatcherTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index bf80bf56809..d957af29f96 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -192,7 +192,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); - MetricData metric_1 = + MetricData metric1 = ImmutableMetricData.createLongSum( Resource.empty(), InstrumentationScopeInfo.empty(), @@ -202,7 +202,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( ImmutableSumData.create( /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); - MetricData metric_2 = + MetricData metric2 = ImmutableMetricData.createLongSum( Resource.empty(), InstrumentationScopeInfo.empty(), @@ -213,7 +213,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); Collection> batches = - batcher.batchMetrics(Arrays.asList(metric_1, metric_2)); + batcher.batchMetrics(Arrays.asList(metric1, metric2)); assertThat(batches).hasSize(4); Collection firstBatch = batches.iterator().next(); From d85e3a2e4215f8bc46bcd80c51ce0aa7b9a0ec65 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 13:57:44 +0000 Subject: [PATCH 08/18] Add missing generated diff files --- docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt index a874b822c9f..b2ecdd07ca8 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt @@ -1,2 +1,4 @@ Comparing source compatibility of opentelemetry-sdk-metrics-1.62.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.61.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder setMaxExportBatchSize(int) From bb860155142f984bfa387ba2b5ebf9ceda71e5b5 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 15:43:02 +0000 Subject: [PATCH 09/18] Update unit tests for enhanced coverage --- .../export/MetricExportBatcherTest.java | 182 +++++++++++++++++- 1 file changed, 175 insertions(+), 7 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index d957af29f96..949d9743b32 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -116,15 +116,24 @@ void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { MetricData firsBatch_m1 = firstBatch.iterator().next(); assertThat(firsBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(firsBatch_m1.getName()).isEqualTo("name"); + assertThat(firsBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(firsBatch_m1.getUnit()).isEqualTo("1"); assertThat(firsBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); MetricData secondBatch_m1 = secondBatch.iterator().next(); assertThat(secondBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(secondBatch_m1.getName()).isEqualTo("name"); + assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); assertThat(secondBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); // Last batch is partially filled. MetricData thirdBatch_m1 = thirdBatch.iterator().next(); assertThat(thirdBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(thirdBatch_m1.getName()).isEqualTo("name"); + assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); assertThat(thirdBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p5); } @@ -153,6 +162,9 @@ void batchMetrics_SplitsLongGauge_SingleBatchPartiallyFilled() { MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3); } @@ -180,6 +192,9 @@ void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() { MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1, p2); assertThat(m1.getDoubleSumData().isMonotonic()).isTrue(); assertThat(m1.getDoubleSumData().getAggregationTemporality()) @@ -189,8 +204,10 @@ void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() { @Test void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics() { MetricExportBatcher batcher = new MetricExportBatcher(1); - LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); - LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + Attributes attrs1 = Attributes.builder().put("key1", "val1").build(); + Attributes attrs2 = Attributes.builder().put("key2", "val2").build(); + LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L); MetricData metric1 = ImmutableMetricData.createLongSum( @@ -232,6 +249,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m1.getDescription()).isEqualTo("desc_1"); assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m1.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1); assertThat(m1.getLongSumData().isMonotonic()).isFalse(); assertThat(m1.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -242,6 +260,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m2.getDescription()).isEqualTo("desc_1"); assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m2.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2); assertThat(m2.getLongSumData().isMonotonic()).isFalse(); assertThat(m2.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -252,6 +271,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m3.getDescription()).isEqualTo("desc_2"); assertThat(m3.getUnit()).isEqualTo("1"); assertThat(m3.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m3.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1); assertThat(m3.getLongSumData().isMonotonic()).isFalse(); assertThat(m3.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -262,6 +282,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m4.getDescription()).isEqualTo("desc_2"); assertThat(m4.getUnit()).isEqualTo("1"); assertThat(m4.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m4.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2); assertThat(m4.getLongSumData().isMonotonic()).isFalse(); assertThat(m4.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -316,11 +337,17 @@ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getHistogramData().getPoints()).containsExactly(p1); assertThat(m1.getHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); MetricData m2 = secondBatch.iterator().next(); assertThat(m2.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getHistogramData().getPoints()).containsExactly(p2); assertThat(m2.getHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); @@ -329,10 +356,14 @@ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() @Test void batchMetrics_MultipleMetricsExactCapacityMatch() { MetricExportBatcher batcher = new MetricExportBatcher(4); - LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); - LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); - LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); - LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + Attributes attrs1 = Attributes.builder().put("k", "v1").build(); + Attributes attrs2 = Attributes.builder().put("k", "v2").build(); + Attributes attrs3 = Attributes.builder().put("k", "v3").build(); + Attributes attrs4 = Attributes.builder().put("k", "v4").build(); + LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, attrs3, 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, attrs4, 4L); MetricData m1 = ImmutableMetricData.createLongGauge( @@ -353,7 +384,16 @@ void batchMetrics_MultipleMetricsExactCapacityMatch() { Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); assertThat(batches).hasSize(1); - assertThat(batches.iterator().next()).containsExactly(m1, m2); + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).containsExactly(m1, m2); + + MetricData res1 = firstBatch.iterator().next(); + MetricData res2 = firstBatch.stream().skip(1).findFirst().get(); + + assertThat(res1.getName()).isEqualTo("name_1"); + assertThat(res1.getLongGaugeData().getPoints()).containsExactly(p1, p2); + assertThat(res2.getName()).isEqualTo("name_2"); + assertThat(res2.getLongGaugeData().getPoints()).containsExactly(p3, p4); } @Test @@ -414,12 +454,18 @@ void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_Sin MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getExponentialHistogramData().getPoints()).containsExactly(p1); assertThat(m1.getExponentialHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); MetricData m2 = secondBatch.iterator().next(); assertThat(m2.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getExponentialHistogramData().getPoints()).containsExactly(p2); assertThat(m2.getExponentialHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); @@ -467,13 +513,135 @@ void batchMetrics_SplitsSummary_MultipleBatchesCompletelyFilled_SingleMetric() { MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getSummaryData().getPoints()).containsExactly(p1); MetricData m2 = secondBatch.iterator().next(); assertThat(m2.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getSummaryData().getPoints()).containsExactly(p2); } + @Test + void batchMetrics_SplitsLongGauge_MultipleBatches() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L); + + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + List> batchesList = new ArrayList<>(batches); + + assertThat(batchesList).hasSize(3); + Collection firstBatch = batchesList.get(0); + Collection secondBatch = batchesList.get(1); + Collection thirdBatch = batchesList.get(2); + + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + assertThat(thirdBatch).hasSize(1); + + MetricData firstBatch_m1 = firstBatch.iterator().next(); + assertThat(firstBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(firstBatch_m1.getName()).isEqualTo("name"); + assertThat(firstBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(firstBatch_m1.getUnit()).isEqualTo("1"); + assertThat(firstBatch_m1.getLongGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData secondBatch_m1 = secondBatch.iterator().next(); + assertThat(secondBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(secondBatch_m1.getName()).isEqualTo("name"); + assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); + assertThat(secondBatch_m1.getLongGaugeData().getPoints()).containsExactly(p3, p4); + + MetricData thirdBatch_m1 = thirdBatch.iterator().next(); + assertThat(thirdBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(thirdBatch_m1.getName()).isEqualTo("name"); + assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); + assertThat(thirdBatch_m1.getLongGaugeData().getPoints()).containsExactly(p5); + } + + @Test + void batchMetrics_SplitsDoubleSum_MultipleBatches() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); + DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0); + DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0); + + MetricData metric = + ImmutableMetricData.createDoubleSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + Arrays.asList(p1, p2, p3))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + List> batchesList = new ArrayList<>(batches); + + assertThat(batchesList).hasSize(3); + Collection firstBatch = batchesList.get(0); + Collection secondBatch = batchesList.get(1); + Collection thirdBatch = batchesList.get(2); + + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + assertThat(thirdBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); + assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1); + assertThat(m1.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m1.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); + assertThat(m2.getDoubleSumData().getPoints()).containsExactly(p2); + assertThat(m2.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m2.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + MetricData m3 = thirdBatch.iterator().next(); + assertThat(m3.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m3.getName()).isEqualTo("name"); + assertThat(m3.getDescription()).isEqualTo("desc"); + assertThat(m3.getUnit()).isEqualTo("1"); + assertThat(m3.getDoubleSumData().getPoints()).containsExactly(p3); + assertThat(m3.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m3.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + @Test void batchMetrics_EmptyPointsInMetricData() { MetricExportBatcher batcher = new MetricExportBatcher(2); From 1bda08ca689f887910b424947f4fd36da524cca0 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 16:38:34 +0000 Subject: [PATCH 10/18] Add unit tests for PeriodicMetricReader --- .../export/PeriodicMetricReaderTest.java | 109 +++++++++++++++++- 1 file changed, 106 insertions(+), 3 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index f109af57510..45b936045d8 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -53,8 +54,13 @@ @MockitoSettings(strictness = Strictness.LENIENT) class PeriodicMetricReaderTest { private static final List LONG_POINT_LIST = - Collections.singletonList( - ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1234567)); + Arrays.asList( + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 2L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 3L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 4L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 5L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 6L)); private static final MetricData METRIC_DATA = ImmutableMetricData.createLongSum( @@ -96,7 +102,7 @@ void startOnlyOnce() { } @Test - void build_withIllegalMaxExportSize() { + void build_WithIllegalMaxExportSize() { assertThatThrownBy( () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(0).build()) .isInstanceOf(IllegalArgumentException.class) @@ -129,6 +135,103 @@ void periodicExport() throws Exception { } } + @Test + void periodicExport_WithMaxExportBatchSize_PartiallyFilledBatch() throws Exception { + WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); + PeriodicMetricReader reader = + PeriodicMetricReader.builder(waitingMetricExporter) + .setInterval(Duration.ofMillis(100)) + .setMaxExportBatchSize(4) + .build(); + + reader.register(collectionRegistration); + MetricData expectedMetricDataBatch1 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(0, 4))); + MetricData expectedMetricDataBatch2 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(4, 6))); + try { + assertThat(waitingMetricExporter.waitForNumberOfExports(2)) + .containsExactly( + Collections.singletonList(expectedMetricDataBatch1), + Collections.singletonList(expectedMetricDataBatch2)); + } finally { + reader.shutdown(); + } + } + + @Test + void periodicExport_WithMaxExportBatchSize_CompletelyFilledBatch() throws Exception { + WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); + PeriodicMetricReader reader = + PeriodicMetricReader.builder(waitingMetricExporter) + .setInterval(Duration.ofMillis(100)) + .setMaxExportBatchSize(2) + .build(); + + reader.register(collectionRegistration); + MetricData expectedMetricDataBatch1 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(0, 2))); + MetricData expectedMetricDataBatch2 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(2, 4))); + + MetricData expectedMetricDataBatch3 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(4, 6))); + try { + assertThat(waitingMetricExporter.waitForNumberOfExports(3)) + .containsExactly( + Collections.singletonList(expectedMetricDataBatch1), + Collections.singletonList(expectedMetricDataBatch2), + Collections.singletonList(expectedMetricDataBatch3)); + } finally { + reader.shutdown(); + } + } + @Test void periodicExport_NoMetricsSkipsExport() { WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); From 400981b8a8c7bc8c7b2c5e488a761ab0e50df3b4 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 16:42:23 +0000 Subject: [PATCH 11/18] Fix checkstyle issues --- .../export/MetricExportBatcherTest.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 949d9743b32..303d3bdb970 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -557,26 +557,26 @@ void batchMetrics_SplitsLongGauge_MultipleBatches() { assertThat(secondBatch).hasSize(1); assertThat(thirdBatch).hasSize(1); - MetricData firstBatch_m1 = firstBatch.iterator().next(); - assertThat(firstBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); - assertThat(firstBatch_m1.getName()).isEqualTo("name"); - assertThat(firstBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(firstBatch_m1.getUnit()).isEqualTo("1"); - assertThat(firstBatch_m1.getLongGaugeData().getPoints()).containsExactly(p1, p2); - - MetricData secondBatch_m1 = secondBatch.iterator().next(); - assertThat(secondBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); - assertThat(secondBatch_m1.getName()).isEqualTo("name"); - assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); - assertThat(secondBatch_m1.getLongGaugeData().getPoints()).containsExactly(p3, p4); - - MetricData thirdBatch_m1 = thirdBatch.iterator().next(); - assertThat(thirdBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); - assertThat(thirdBatch_m1.getName()).isEqualTo("name"); - assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); - assertThat(thirdBatch_m1.getLongGaugeData().getPoints()).containsExactly(p5); + MetricData firstBatchMetricData = firstBatch.iterator().next(); + assertThat(firstBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(firstBatchMetricData.getName()).isEqualTo("name"); + assertThat(firstBatchMetricData.getDescription()).isEqualTo("desc"); + assertThat(firstBatchMetricData.getUnit()).isEqualTo("1"); + assertThat(firstBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData secondBatchMetricData = secondBatch.iterator().next(); + assertThat(secondBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(secondBatchMetricData.getName()).isEqualTo("name"); + assertThat(secondBatchMetricData.getDescription()).isEqualTo("desc"); + assertThat(secondBatchMetricData.getUnit()).isEqualTo("1"); + assertThat(secondBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p3, p4); + + MetricData thirdBatchMetricData = thirdBatch.iterator().next(); + assertThat(thirdBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(thirdBatchMetricData.getName()).isEqualTo("name"); + assertThat(thirdBatchMetricData.getDescription()).isEqualTo("desc"); + assertThat(thirdBatchMetricData.getUnit()).isEqualTo("1"); + assertThat(thirdBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p5); } @Test From 95370fa028f6e35f7ad6c4563540215f8479b1af Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 16:51:32 +0000 Subject: [PATCH 12/18] Clean up inline code comments --- .../metrics/export/MetricExportBatcher.java | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 28896181136..0c6c7a62986 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -65,23 +65,20 @@ Collection> batchMetrics(Collection metrics) if (metrics.isEmpty()) { return Collections.emptyList(); } - Collection> preparedBatchesForExport = new ArrayList<>(); Collection currentBatch = new ArrayList<>(maxExportBatchSize); - // Iterate through each MetricData and fill up the current batch, splitting if - // necessary + // Fill active batch and split overlapping metric points if needed for (MetricData metricData : metrics) { MetricDataSplitOperationResult splitResult = prepareExportBatches(metricData, currentBatch); preparedBatchesForExport.addAll(splitResult.getPreparedBatches()); currentBatch = splitResult.getLastInProgressBatch(); } - // Add the last in-progress batch if it is not empty + // Push trailing capacity block if (!currentBatch.isEmpty()) { preparedBatchesForExport.add(currentBatch); } - return Collections.unmodifiableCollection(preparedBatchesForExport); } @@ -103,17 +100,14 @@ private MetricDataSplitOperationResult prepareExportBatches( currentBatch.add(metricData); return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch); } else { - // remaining capacity in current batch cannot hold all points from metric data - // split the metric data into multiple metric data objects + // Remaining capacity can't hold all points, partition existing metric data object List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); Collection> preparedBatches = new ArrayList<>(); - // Split the points into chunks of size maxExportBatchSize - // From the first chunk, take as many points as possible to fill current batch + // Fill current batch buffer completely int pointsToTake = remainingCapacityInCurrentBatch; int currentIndex = 0; - // fill the current batch and add it to prepared batches if (pointsToTake > 0) { currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); @@ -121,19 +115,13 @@ private MetricDataSplitOperationResult prepareExportBatches( preparedBatches.add(currentBatch); } - // If the current metric contains more data points than could fit into the - // filled batch above, - // we initialize a fresh batch to receive the spillover points on subsequent - // iterations. + // Buffer spillover onto fresh partitions int remainingPoints = totalPointsInMetricData - currentIndex; currentBatch = new ArrayList<>(maxExportBatchSize); remainingCapacityInCurrentBatch = maxExportBatchSize; - // Add remaining points in chunks of size maxExportBatchSize + // Iterate extra chunks sized to exact transport constraints while (currentIndex < totalPointsInMetricData && remainingPoints > 0) { - // There are still more points in the current metricData - // Take as many points as possible to fill current batch up till remaining - // capacity pointsToTake = Math.min(remainingPoints, remainingCapacityInCurrentBatch); currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); From e470a0f1321e70b0dddbcc95a3faba12ee3a4a52 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 17:55:38 +0000 Subject: [PATCH 13/18] Fix bug for miscalculating remaining capacity of a batch --- .../metrics/export/MetricExportBatcher.java | 6 +- .../export/MetricExportBatcherTest.java | 92 +++++++++++++++---- 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 0c6c7a62986..623e17bea39 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -93,7 +93,11 @@ Collection> batchMetrics(Collection metrics) */ private MetricDataSplitOperationResult prepareExportBatches( MetricData metricData, Collection currentBatch) { - int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatch.size(); + int currentBatchPoints = 0; + for (MetricData m : currentBatch) { + currentBatchPoints += m.getData().getPoints().size(); + } + int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatchPoints; int totalPointsInMetricData = metricData.getData().getPoints().size(); if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 303d3bdb970..98f71ac5cea 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -114,27 +114,27 @@ void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { assertThat(secondBatch.size()).isEqualTo(1); assertThat(thirdBatch.size()).isEqualTo(1); - MetricData firsBatch_m1 = firstBatch.iterator().next(); - assertThat(firsBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); - assertThat(firsBatch_m1.getName()).isEqualTo("name"); - assertThat(firsBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(firsBatch_m1.getUnit()).isEqualTo("1"); - assertThat(firsBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); - - MetricData secondBatch_m1 = secondBatch.iterator().next(); - assertThat(secondBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); - assertThat(secondBatch_m1.getName()).isEqualTo("name"); - assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); - assertThat(secondBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); + MetricData b1m1 = firstBatch.iterator().next(); + assertThat(b1m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(b1m1.getName()).isEqualTo("name"); + assertThat(b1m1.getDescription()).isEqualTo("desc"); + assertThat(b1m1.getUnit()).isEqualTo("1"); + assertThat(b1m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData b2m1 = secondBatch.iterator().next(); + assertThat(b2m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(b2m1.getName()).isEqualTo("name"); + assertThat(b2m1.getDescription()).isEqualTo("desc"); + assertThat(b2m1.getUnit()).isEqualTo("1"); + assertThat(b2m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); // Last batch is partially filled. - MetricData thirdBatch_m1 = thirdBatch.iterator().next(); - assertThat(thirdBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); - assertThat(thirdBatch_m1.getName()).isEqualTo("name"); - assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); - assertThat(thirdBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p5); + MetricData b3m1 = thirdBatch.iterator().next(); + assertThat(b3m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(b3m1.getName()).isEqualTo("name"); + assertThat(b3m1.getDescription()).isEqualTo("desc"); + assertThat(b3m1.getUnit()).isEqualTo("1"); + assertThat(b3m1.getDoubleGaugeData().getPoints()).containsExactly(p5); } @Test @@ -396,6 +396,60 @@ void batchMetrics_MultipleMetricsExactCapacityMatch() { assertThat(res2.getLongGaugeData().getPoints()).containsExactly(p3, p4); } + @Test + void batchMetrics_SplitsLongGauge_MultipleMetrics_ExceedsCapacity() { + MetricExportBatcher batcher = new MetricExportBatcher(4); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L); + LongPointData p6 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 6L); + + MetricData m1 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3))); + MetricData m2 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p4, p5, p6))); + + Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); + + assertThat(batches).hasSize(2); + + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(2); + MetricData b1m1 = firstBatch.iterator().next(); + MetricData b1m2 = firstBatch.stream().skip(1).findFirst().get(); + assertThat(b1m1.getName()).isEqualTo("name_1"); + assertThat(b1m1.getDescription()).isEqualTo("desc"); + assertThat(b1m1.getUnit()).isEqualTo("1"); + assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3); + + assertThat(b1m2.getName()).isEqualTo("name_2"); + assertThat(b1m2.getDescription()).isEqualTo("desc"); + assertThat(b1m2.getUnit()).isEqualTo("1"); + assertThat(b1m2.getLongGaugeData().getPoints()).containsExactly(p4); + + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(secondBatch).hasSize(1); + MetricData b2m1 = secondBatch.iterator().next(); + assertThat(b2m1.getName()).isEqualTo("name_2"); + assertThat(b2m1.getDescription()).isEqualTo("desc"); + assertThat(b2m1.getUnit()).isEqualTo("1"); + assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p5, p6); + } + @Test void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { MetricExportBatcher batcher = new MetricExportBatcher(1); From 03364b5abb4358d37ea07137036d293fa5d9731c Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 18:10:10 +0000 Subject: [PATCH 14/18] Add missing Javadoc for public facing API --- .../sdk/metrics/export/PeriodicMetricReaderBuilder.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java index df4d387caef..23c39fdc4ee 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java @@ -61,6 +61,14 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor return this; } + /** + * Sets the maximum number of data points to include in a single export batch. If unset, no + * batching will be performed. The maximum number of data points is considered across MetricData + * objects scheduled for export. + * + * @param maxExportBatchSize The maximum number of data points to include in a single export + * batch. + */ public PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) { checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive"); this.metricsBatcher = new MetricExportBatcher(maxExportBatchSize); From 73c380de3e8212c227865055ffd7abf77c29ca8a Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 20:43:08 +0000 Subject: [PATCH 15/18] Refactor logic in prepareExportBatches to remove redundancy --- .../metrics/export/MetricExportBatcher.java | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 623e17bea39..c9a9ad3f500 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -107,31 +107,16 @@ private MetricDataSplitOperationResult prepareExportBatches( // Remaining capacity can't hold all points, partition existing metric data object List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); Collection> preparedBatches = new ArrayList<>(); - - // Fill current batch buffer completely - int pointsToTake = remainingCapacityInCurrentBatch; int currentIndex = 0; - if (pointsToTake > 0) { - currentBatch.add( - copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); - currentIndex = pointsToTake; - preparedBatches.add(currentBatch); - } - - // Buffer spillover onto fresh partitions - int remainingPoints = totalPointsInMetricData - currentIndex; - currentBatch = new ArrayList<>(maxExportBatchSize); - remainingCapacityInCurrentBatch = maxExportBatchSize; - - // Iterate extra chunks sized to exact transport constraints - while (currentIndex < totalPointsInMetricData && remainingPoints > 0) { - pointsToTake = Math.min(remainingPoints, remainingCapacityInCurrentBatch); + while (currentIndex < totalPointsInMetricData) { + int pointsToTake = + Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch); currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); currentIndex += pointsToTake; - remainingPoints -= pointsToTake; remainingCapacityInCurrentBatch -= pointsToTake; + if (remainingCapacityInCurrentBatch == 0) { preparedBatches.add(currentBatch); currentBatch = new ArrayList<>(maxExportBatchSize); From 67509b596618b81e1579c56f5828d7b1bced3063 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 16 Apr 2026 15:58:02 +0000 Subject: [PATCH 16/18] Address comment about defensive copy for original point sublist --- .../opentelemetry/sdk/metrics/export/MetricExportBatcher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index c9a9ad3f500..30adbd9c58f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -133,7 +133,9 @@ private static MetricData copyMetricData( int dataPointsOffset, int dataPointsToTake) { List points = - originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake); + Collections.unmodifiableList( + new ArrayList<>( + originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake))); return createMetricDataWithPoints(original, points); } From ea6330ee6e9fca561860ef9e3f323bfca085dca1 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 16 Apr 2026 16:16:40 +0000 Subject: [PATCH 17/18] Prevent copying MetricData for 0 points --- .../sdk/metrics/export/MetricExportBatcher.java | 11 +++++++---- .../sdk/metrics/export/MetricExportBatcherTest.java | 1 - 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 30adbd9c58f..4db7d2ec0cb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -112,10 +112,13 @@ private MetricDataSplitOperationResult prepareExportBatches( while (currentIndex < totalPointsInMetricData) { int pointsToTake = Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch); - currentBatch.add( - copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); - currentIndex += pointsToTake; - remainingCapacityInCurrentBatch -= pointsToTake; + + if (pointsToTake > 0) { + currentBatch.add( + copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentIndex += pointsToTake; + remainingCapacityInCurrentBatch -= pointsToTake; + } if (remainingCapacityInCurrentBatch == 0) { preparedBatches.add(currentBatch); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 98f71ac5cea..48f2132c946 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -83,7 +83,6 @@ void batchMetrics_MetricFitsIntact() { } @Test - @SuppressWarnings("all") void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { MetricExportBatcher batcher = new MetricExportBatcher(2); DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); From 34ca25a8054f461c95f878dd87da9be5540166fe Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 16 Apr 2026 18:08:41 +0000 Subject: [PATCH 18/18] Add test case to verify there are no batches with empty metric points --- .../export/MetricExportBatcherTest.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 48f2132c946..ef8c30dbd6e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -449,6 +449,62 @@ void batchMetrics_SplitsLongGauge_MultipleMetrics_ExceedsCapacity() { assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p5, p6); } + @Test + void batchMetrics_SplitsLongGauge_MultipleMetrics_PerfectFillThenSplit() { + // m1 fills the batch completely (remaining capacity becomes 0). + // m2 has 3 points, which forces it to split from the start of a fully-exhausted + // previous pass. + // This test case fails if there is an empty batch + MetricExportBatcher batcher = new MetricExportBatcher(2); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L); + + MetricData m1 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2))); + MetricData m2 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p3, p4, p5))); + + Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); + + assertThat(batches).hasSize(3); + + // Batch 1 should contain exactly m1 (p1, p2) + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(1); + MetricData b1m1 = firstBatch.iterator().next(); + assertThat(b1m1.getName()).isEqualTo("name_1"); + assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2); + + // Batch 2 should contain the first part of m2 (p3, p4) + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(secondBatch).hasSize(1); + MetricData b2m1 = secondBatch.iterator().next(); + assertThat(b2m1.getName()).isEqualTo("name_2"); + assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p3, p4); + + // Batch 3 should contain the rest of m2 (p5) + Collection thirdBatch = batches.stream().skip(2).findFirst().get(); + assertThat(thirdBatch).hasSize(1); + MetricData b3m1 = thirdBatch.iterator().next(); + assertThat(b3m1.getName()).isEqualTo("name_2"); + assertThat(b3m1.getLongGaugeData().getPoints()).containsExactly(p5); + } + @Test void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { MetricExportBatcher batcher = new MetricExportBatcher(1);