diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt
index a874b822c9f..b2ecdd07ca8 100644
--- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt
+++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt
@@ -1,2 +1,4 @@
Comparing source compatibility of opentelemetry-sdk-metrics-1.62.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.61.0.jar
-No changes.
\ No newline at end of file
+*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder setMaxExportBatchSize(int)
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java
new file mode 100644
index 00000000000..4db7d2ec0cb
--- /dev/null
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.metrics.export;
+
+import io.opentelemetry.sdk.metrics.data.Data;
+import io.opentelemetry.sdk.metrics.data.DoublePointData;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
+import io.opentelemetry.sdk.metrics.data.HistogramData;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import io.opentelemetry.sdk.metrics.data.SumData;
+import io.opentelemetry.sdk.metrics.data.SummaryPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Batches metric data into multiple batches based on the maximum export batch size. This is used by
+ * the {@link PeriodicMetricReader} to batch metric data before exporting it.
+ *
+ *
This class is internal and is hence not for public use. Its APIs are unstable and can change
+ * at any time.
+ */
+class MetricExportBatcher {
+ private final int maxExportBatchSize;
+
+ /**
+ * Creates a new {@link MetricExportBatcher} with the given maximum export batch size.
+ *
+ * @param maxExportBatchSize The maximum number of {@link Data#getPoints()} in each export.
+ */
+ MetricExportBatcher(int maxExportBatchSize) {
+ if (maxExportBatchSize <= 0) {
+ throw new IllegalArgumentException("maxExportBatchSize must be positive");
+ }
+ this.maxExportBatchSize = maxExportBatchSize;
+ }
+
+ @Override
+ public String toString() {
+ return "MetricExportBatcher{maxExportBatchSize=" + maxExportBatchSize + "}";
+ }
+
+ /**
+ * Batches the given metric data into multiple batches based on the maximum export batch size.
+ *
+ * @param metrics The collection of metric data objects to batch based on the number of data
+ * points they contain.
+ * @return A collection of batches of metric data.
+ */
+ Collection> batchMetrics(Collection metrics) {
+ if (metrics.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collection> preparedBatchesForExport = new ArrayList<>();
+ Collection currentBatch = new ArrayList<>(maxExportBatchSize);
+
+ // Fill active batch and split overlapping metric points if needed
+ for (MetricData metricData : metrics) {
+ MetricDataSplitOperationResult splitResult = prepareExportBatches(metricData, currentBatch);
+ preparedBatchesForExport.addAll(splitResult.getPreparedBatches());
+ currentBatch = splitResult.getLastInProgressBatch();
+ }
+
+ // Push trailing capacity block
+ if (!currentBatch.isEmpty()) {
+ preparedBatchesForExport.add(currentBatch);
+ }
+ return Collections.unmodifiableCollection(preparedBatchesForExport);
+ }
+
+ /**
+ * Prepares export batches from a single metric data object. This function only operates on a
+ * single metric data object, fills up the current batch with as many points as possible from the
+ * metric data object, and then creates new metric data objects for the remaining points.
+ *
+ * @param metricData The metric data object to split.
+ * @param currentBatch The current batch of metric data objects.
+ * @return A result containing the prepared batches and the last in-progress batch.
+ */
+ private MetricDataSplitOperationResult prepareExportBatches(
+ MetricData metricData, Collection currentBatch) {
+ int currentBatchPoints = 0;
+ for (MetricData m : currentBatch) {
+ currentBatchPoints += m.getData().getPoints().size();
+ }
+ int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatchPoints;
+ int totalPointsInMetricData = metricData.getData().getPoints().size();
+
+ if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) {
+ currentBatch.add(metricData);
+ return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch);
+ } else {
+ // Remaining capacity can't hold all points, partition existing metric data object
+ List originalPointsList = new ArrayList<>(metricData.getData().getPoints());
+ Collection> preparedBatches = new ArrayList<>();
+ int currentIndex = 0;
+
+ while (currentIndex < totalPointsInMetricData) {
+ int pointsToTake =
+ Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch);
+
+ if (pointsToTake > 0) {
+ currentBatch.add(
+ copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake));
+ currentIndex += pointsToTake;
+ remainingCapacityInCurrentBatch -= pointsToTake;
+ }
+
+ if (remainingCapacityInCurrentBatch == 0) {
+ preparedBatches.add(currentBatch);
+ currentBatch = new ArrayList<>(maxExportBatchSize);
+ remainingCapacityInCurrentBatch = maxExportBatchSize;
+ }
+ }
+ return new MetricDataSplitOperationResult(preparedBatches, currentBatch);
+ }
+ }
+
+ private static MetricData copyMetricData(
+ MetricData original,
+ List originalPointsList,
+ int dataPointsOffset,
+ int dataPointsToTake) {
+ List points =
+ Collections.unmodifiableList(
+ new ArrayList<>(
+ originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake)));
+ return createMetricDataWithPoints(original, points);
+ }
+
+ /**
+ * Creates a new MetricData with the given points.
+ *
+ * @param original The original MetricData.
+ * @param points The points to use for the new MetricData.
+ * @return A new MetricData with the given points.
+ */
+ @SuppressWarnings("unchecked")
+ private static MetricData createMetricDataWithPoints(
+ MetricData original, Collection points) {
+ switch (original.getType()) {
+ case DOUBLE_GAUGE:
+ return ImmutableMetricData.createDoubleGauge(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableGaugeData.create((Collection) (Collection>) points));
+ case LONG_GAUGE:
+ return ImmutableMetricData.createLongGauge(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableGaugeData.create((Collection) (Collection>) points));
+ case DOUBLE_SUM:
+ SumData doubleSumData = original.getDoubleSumData();
+ return ImmutableMetricData.createDoubleSum(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableSumData.create(
+ doubleSumData.isMonotonic(),
+ doubleSumData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case LONG_SUM:
+ SumData longSumData = original.getLongSumData();
+ return ImmutableMetricData.createLongSum(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableSumData.create(
+ longSumData.isMonotonic(),
+ longSumData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case HISTOGRAM:
+ HistogramData histogramData = original.getHistogramData();
+ return ImmutableMetricData.createDoubleHistogram(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableHistogramData.create(
+ histogramData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case EXPONENTIAL_HISTOGRAM:
+ ExponentialHistogramData expHistogramData = original.getExponentialHistogramData();
+ return ImmutableMetricData.createExponentialHistogram(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableExponentialHistogramData.create(
+ expHistogramData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case SUMMARY:
+ return ImmutableMetricData.createDoubleSummary(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableSummaryData.create((Collection) (Collection>) points));
+ }
+ throw new UnsupportedOperationException("Unsupported metric type: " + original.getType());
+ }
+
+ /**
+ * A data class to store the result of a split operation performed on a single {@link MetricData}
+ * object.
+ */
+ private static class MetricDataSplitOperationResult {
+ private final Collection> preparedBatches;
+ private final Collection lastInProgressBatch;
+
+ /**
+ * Creates a new MetricDataSplitOperationResult.
+ *
+ * @param preparedBatches The collection of prepared batches of metric data for export. Each
+ * batch of {@link MetricData} objects is guaranteed to have at most {@link
+ * #maxExportBatchSize} points.
+ * @param lastInProgressBatch The last batch that is still in progress. This batch may have less
+ * than {@link #maxExportBatchSize} points.
+ */
+ MetricDataSplitOperationResult(
+ Collection> preparedBatches,
+ Collection lastInProgressBatch) {
+ this.preparedBatches = preparedBatches;
+ this.lastInProgressBatch = lastInProgressBatch;
+ }
+
+ Collection> getPreparedBatches() {
+ return preparedBatches;
+ }
+
+ Collection getLastInProgressBatch() {
+ return lastInProgressBatch;
+ }
+ }
+}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java
index fe0d87bce94..7a0e3b2f0b3 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java
@@ -16,7 +16,9 @@
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -51,6 +53,7 @@ public final class PeriodicMetricReader implements MetricReader {
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
@Nullable private volatile ScheduledFuture> scheduledFuture;
+ @Nullable private final MetricExportBatcher metricsBatcher;
/**
* Returns a new {@link PeriodicMetricReader} which exports to the {@code exporter} once every
@@ -66,10 +69,14 @@ public static PeriodicMetricReaderBuilder builder(MetricExporter exporter) {
}
PeriodicMetricReader(
- MetricExporter exporter, long intervalNanos, ScheduledExecutorService scheduler) {
+ MetricExporter exporter,
+ long intervalNanos,
+ ScheduledExecutorService scheduler,
+ @Nullable MetricExportBatcher metricsBatcher) {
this.exporter = exporter;
this.intervalNanos = intervalNanos;
this.scheduler = scheduler;
+ this.metricsBatcher = metricsBatcher;
this.scheduled = new Scheduled();
}
@@ -160,6 +167,8 @@ public String toString() {
+ exporter
+ ", intervalNanos="
+ intervalNanos
+ + ", metricsBatcher="
+ + metricsBatcher
+ '}';
}
@@ -212,7 +221,18 @@ CompletableResultCode doRun() {
flushResult.succeed();
exportAvailable.set(true);
} else {
- CompletableResultCode result = exporter.export(metricData);
+ Collection> batches = null;
+ CompletableResultCode result;
+ if (metricsBatcher != null) {
+ batches = metricsBatcher.batchMetrics(metricData);
+ List results = new ArrayList<>(batches.size());
+ for (Collection batch : batches) {
+ results.add(exporter.export(batch));
+ }
+ result = CompletableResultCode.ofAll(results);
+ } else {
+ result = exporter.export(metricData);
+ }
result.whenComplete(
() -> {
if (!result.isSuccess()) {
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java
index 04cdd27506d..23c39fdc4ee 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java
@@ -30,6 +30,8 @@ public final class PeriodicMetricReaderBuilder {
@Nullable private ScheduledExecutorService executor;
+ @Nullable private MetricExportBatcher metricsBatcher;
+
PeriodicMetricReaderBuilder(MetricExporter metricExporter) {
this.metricExporter = metricExporter;
}
@@ -59,6 +61,20 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor
return this;
}
+ /**
+ * Sets the maximum number of data points to include in a single export batch. If unset, no
+ * batching will be performed. The maximum number of data points is considered across MetricData
+ * objects scheduled for export.
+ *
+ * @param maxExportBatchSize The maximum number of data points to include in a single export
+ * batch.
+ */
+ public PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) {
+ checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive");
+ this.metricsBatcher = new MetricExportBatcher(maxExportBatchSize);
+ return this;
+ }
+
/** Build a {@link PeriodicMetricReader} with the configuration of this builder. */
public PeriodicMetricReader build() {
ScheduledExecutorService executor = this.executor;
@@ -66,6 +82,6 @@ public PeriodicMetricReader build() {
executor =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("PeriodicMetricReader"));
}
- return new PeriodicMetricReader(metricExporter, intervalNanos, executor);
+ return new PeriodicMetricReader(metricExporter, intervalNanos, executor, metricsBatcher);
}
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java
new file mode 100644
index 00000000000..ef8c30dbd6e
--- /dev/null
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java
@@ -0,0 +1,771 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.metrics.export;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.DoublePointData;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.SummaryPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramBuckets;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile;
+import io.opentelemetry.sdk.resources.Resource;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+
+class MetricExportBatcherTest {
+
+ @Test
+ void constructor_InvalidMaxExportBatchSize() {
+ assertThatThrownBy(() -> new MetricExportBatcher(0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+ assertThatThrownBy(() -> new MetricExportBatcher(-1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+ }
+
+ @Test
+ void toString_Valid() {
+ MetricExportBatcher batcher = new MetricExportBatcher(10);
+ assertThat(batcher.toString()).isEqualTo("MetricExportBatcher{maxExportBatchSize=10}");
+ }
+
+ @Test
+ void batchMetrics_EmptyMetrics() {
+ MetricExportBatcher batcher = new MetricExportBatcher(10);
+ assertThat(batcher.batchMetrics(Collections.emptyList())).isEmpty();
+ }
+
+ @Test
+ void batchMetrics_MetricFitsIntact() {
+ MetricExportBatcher batcher = new MetricExportBatcher(10);
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Collections.singletonList(p1)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+ assertThat(batches).hasSize(1);
+ assertThat(batches.iterator().next()).containsExactly(metric);
+ }
+
+ @Test
+ void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() {
+ MetricExportBatcher batcher = new MetricExportBatcher(2);
+ DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0);
+ DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0);
+ DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0);
+ DoublePointData p4 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 4.0);
+ DoublePointData p5 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 5.0);
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+ List> batchesList = new ArrayList<>(batches);
+
+ assertThat(batchesList.size()).isEqualTo(3);
+ Collection firstBatch = batchesList.get(0);
+ Collection secondBatch = batchesList.get(1);
+ Collection thirdBatch = batchesList.get(2);
+
+ assertThat(firstBatch.size()).isEqualTo(1);
+ assertThat(secondBatch.size()).isEqualTo(1);
+ assertThat(thirdBatch.size()).isEqualTo(1);
+
+ MetricData b1m1 = firstBatch.iterator().next();
+ assertThat(b1m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE);
+ assertThat(b1m1.getName()).isEqualTo("name");
+ assertThat(b1m1.getDescription()).isEqualTo("desc");
+ assertThat(b1m1.getUnit()).isEqualTo("1");
+ assertThat(b1m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2);
+
+ MetricData b2m1 = secondBatch.iterator().next();
+ assertThat(b2m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE);
+ assertThat(b2m1.getName()).isEqualTo("name");
+ assertThat(b2m1.getDescription()).isEqualTo("desc");
+ assertThat(b2m1.getUnit()).isEqualTo("1");
+ assertThat(b2m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4);
+
+ // Last batch is partially filled.
+ MetricData b3m1 = thirdBatch.iterator().next();
+ assertThat(b3m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE);
+ assertThat(b3m1.getName()).isEqualTo("name");
+ assertThat(b3m1.getDescription()).isEqualTo("desc");
+ assertThat(b3m1.getUnit()).isEqualTo("1");
+ assertThat(b3m1.getDoubleGaugeData().getPoints()).containsExactly(p5);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_SingleBatchPartiallyFilled() {
+ MetricExportBatcher batcher = new MetricExportBatcher(4);
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+
+ assertThat(batches).hasSize(1);
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(1); // There is only 1 MetricData
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3);
+ }
+
+ @Test
+ void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() {
+ MetricExportBatcher batcher = new MetricExportBatcher(2);
+ DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0);
+ DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0);
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true, AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(1); // There is only 1 MetricData
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1, p2);
+ assertThat(m1.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m1.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics() {
+ MetricExportBatcher batcher = new MetricExportBatcher(1);
+ Attributes attrs1 = Attributes.builder().put("key1", "val1").build();
+ Attributes attrs2 = Attributes.builder().put("key2", "val2").build();
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L);
+
+ MetricData metric1 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc_1",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2)));
+
+ MetricData metric2 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc_2",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ batcher.batchMetrics(Arrays.asList(metric1, metric2));
+
+ assertThat(batches).hasSize(4);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ Collection thirdBatch = batches.stream().skip(2).findFirst().get();
+ Collection fourthBatch = batches.stream().skip(3).findFirst().get();
+
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+ assertThat(thirdBatch).hasSize(1);
+ assertThat(fourthBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m1.getName()).isEqualTo("name_1");
+ assertThat(m1.getDescription()).isEqualTo("desc_1");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getLongSumData().getPoints()).containsExactly(p1);
+ assertThat(m1.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1);
+ assertThat(m1.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m1.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m2.getName()).isEqualTo("name_1");
+ assertThat(m2.getDescription()).isEqualTo("desc_1");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getLongSumData().getPoints()).containsExactly(p2);
+ assertThat(m2.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2);
+ assertThat(m2.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m2.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+
+ MetricData m3 = thirdBatch.iterator().next();
+ assertThat(m3.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m3.getName()).isEqualTo("name_2");
+ assertThat(m3.getDescription()).isEqualTo("desc_2");
+ assertThat(m3.getUnit()).isEqualTo("1");
+ assertThat(m3.getLongSumData().getPoints()).containsExactly(p1);
+ assertThat(m3.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1);
+ assertThat(m3.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m3.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+
+ MetricData m4 = fourthBatch.iterator().next();
+ assertThat(m4.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m4.getName()).isEqualTo("name_2");
+ assertThat(m4.getDescription()).isEqualTo("desc_2");
+ assertThat(m4.getUnit()).isEqualTo("1");
+ assertThat(m4.getLongSumData().getPoints()).containsExactly(p2);
+ assertThat(m4.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2);
+ assertThat(m4.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m4.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+ }
+
+ @Test
+ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() {
+ MetricExportBatcher batcher = new MetricExportBatcher(1);
+ ImmutableHistogramPointData p1 =
+ ImmutableHistogramPointData.create(
+ 1,
+ 2,
+ Attributes.empty(),
+ 1.0,
+ /* hasMin= */ false,
+ 0.0,
+ /* hasMax= */ false,
+ 0.0,
+ Collections.emptyList(),
+ Collections.singletonList(1L));
+ ImmutableHistogramPointData p2 =
+ ImmutableHistogramPointData.create(
+ 1,
+ 2,
+ Attributes.empty(),
+ 2.0,
+ /* hasMin= */ false,
+ 0.0,
+ /* hasMax= */ false,
+ 0.0,
+ Collections.emptyList(),
+ Collections.singletonList(2L));
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleHistogram(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableHistogramData.create(
+ AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+
+ assertThat(batches).hasSize(2);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.HISTOGRAM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getHistogramData().getPoints()).containsExactly(p1);
+ assertThat(m1.getHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.HISTOGRAM);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getHistogramData().getPoints()).containsExactly(p2);
+ assertThat(m2.getHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_MultipleMetricsExactCapacityMatch() {
+ MetricExportBatcher batcher = new MetricExportBatcher(4);
+ Attributes attrs1 = Attributes.builder().put("k", "v1").build();
+ Attributes attrs2 = Attributes.builder().put("k", "v2").build();
+ Attributes attrs3 = Attributes.builder().put("k", "v3").build();
+ Attributes attrs4 = Attributes.builder().put("k", "v4").build();
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, attrs3, 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, attrs4, 4L);
+
+ MetricData m1 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2)));
+ MetricData m2 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p3, p4)));
+
+ Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2));
+ assertThat(batches).hasSize(1);
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).containsExactly(m1, m2);
+
+ MetricData res1 = firstBatch.iterator().next();
+ MetricData res2 = firstBatch.stream().skip(1).findFirst().get();
+
+ assertThat(res1.getName()).isEqualTo("name_1");
+ assertThat(res1.getLongGaugeData().getPoints()).containsExactly(p1, p2);
+ assertThat(res2.getName()).isEqualTo("name_2");
+ assertThat(res2.getLongGaugeData().getPoints()).containsExactly(p3, p4);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_MultipleMetrics_ExceedsCapacity() {
+ MetricExportBatcher batcher = new MetricExportBatcher(4);
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L);
+ LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L);
+ LongPointData p6 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 6L);
+
+ MetricData m1 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3)));
+ MetricData m2 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p4, p5, p6)));
+
+ Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2));
+
+ assertThat(batches).hasSize(2);
+
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(2);
+ MetricData b1m1 = firstBatch.iterator().next();
+ MetricData b1m2 = firstBatch.stream().skip(1).findFirst().get();
+ assertThat(b1m1.getName()).isEqualTo("name_1");
+ assertThat(b1m1.getDescription()).isEqualTo("desc");
+ assertThat(b1m1.getUnit()).isEqualTo("1");
+ assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3);
+
+ assertThat(b1m2.getName()).isEqualTo("name_2");
+ assertThat(b1m2.getDescription()).isEqualTo("desc");
+ assertThat(b1m2.getUnit()).isEqualTo("1");
+ assertThat(b1m2.getLongGaugeData().getPoints()).containsExactly(p4);
+
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(secondBatch).hasSize(1);
+ MetricData b2m1 = secondBatch.iterator().next();
+ assertThat(b2m1.getName()).isEqualTo("name_2");
+ assertThat(b2m1.getDescription()).isEqualTo("desc");
+ assertThat(b2m1.getUnit()).isEqualTo("1");
+ assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p5, p6);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_MultipleMetrics_PerfectFillThenSplit() {
+ // m1 fills the batch completely (remaining capacity becomes 0).
+ // m2 has 3 points, which forces it to split from the start of a fully-exhausted
+ // previous pass.
+ // This test case fails if there is an empty batch
+ MetricExportBatcher batcher = new MetricExportBatcher(2);
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L);
+ LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L);
+
+ MetricData m1 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2)));
+ MetricData m2 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p3, p4, p5)));
+
+ Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2));
+
+ assertThat(batches).hasSize(3);
+
+ // Batch 1 should contain exactly m1 (p1, p2)
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(1);
+ MetricData b1m1 = firstBatch.iterator().next();
+ assertThat(b1m1.getName()).isEqualTo("name_1");
+ assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2);
+
+ // Batch 2 should contain the first part of m2 (p3, p4)
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(secondBatch).hasSize(1);
+ MetricData b2m1 = secondBatch.iterator().next();
+ assertThat(b2m1.getName()).isEqualTo("name_2");
+ assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p3, p4);
+
+ // Batch 3 should contain the rest of m2 (p5)
+ Collection thirdBatch = batches.stream().skip(2).findFirst().get();
+ assertThat(thirdBatch).hasSize(1);
+ MetricData b3m1 = thirdBatch.iterator().next();
+ assertThat(b3m1.getName()).isEqualTo("name_2");
+ assertThat(b3m1.getLongGaugeData().getPoints()).containsExactly(p5);
+ }
+
+ @Test
+ void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() {
+ MetricExportBatcher batcher = new MetricExportBatcher(1);
+ ExponentialHistogramBuckets buckets =
+ ImmutableExponentialHistogramBuckets.create(
+ /* scale= */ 20, /* offset= */ 0, /* bucketCounts= */ Collections.singletonList(1L));
+ ExponentialHistogramPointData p1 =
+ ImmutableExponentialHistogramPointData.create(
+ /* scale= */ 20,
+ /* sum= */ 1.0,
+ /* zeroCount= */ 0,
+ /* hasMin= */ false,
+ /* min= */ 0.0,
+ /* hasMax= */ false,
+ /* max= */ 0.0,
+ /* positiveBuckets= */ buckets,
+ /* negativeBuckets= */ buckets,
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* exemplars= */ Collections.emptyList());
+ ExponentialHistogramPointData p2 =
+ ImmutableExponentialHistogramPointData.create(
+ /* scale= */ 20,
+ /* sum= */ 2.0,
+ /* zeroCount= */ 0,
+ /* hasMin= */ false,
+ /* min= */ 0.0,
+ /* hasMax= */ false,
+ /* max= */ 0.0,
+ /* positiveBuckets= */ buckets,
+ /* negativeBuckets= */ buckets,
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* exemplars= */ Collections.emptyList());
+
+ MetricData metric =
+ ImmutableMetricData.createExponentialHistogram(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableExponentialHistogramData.create(
+ AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+
+ assertThat(batches).hasSize(2);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getExponentialHistogramData().getPoints()).containsExactly(p1);
+ assertThat(m1.getExponentialHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getExponentialHistogramData().getPoints()).containsExactly(p2);
+ assertThat(m2.getExponentialHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_SplitsSummary_MultipleBatchesCompletelyFilled_SingleMetric() {
+ MetricExportBatcher batcher = new MetricExportBatcher(1);
+ SummaryPointData p1 =
+ ImmutableSummaryPointData.create(
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* count= */ 1,
+ /* sum= */ 1.0,
+ /* percentileValues= */ Collections.singletonList(
+ ImmutableValueAtQuantile.create(0.5, 1.0)));
+ SummaryPointData p2 =
+ ImmutableSummaryPointData.create(
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* count= */ 1,
+ /* sum= */ 2.0,
+ /* percentileValues= */ Collections.singletonList(
+ ImmutableValueAtQuantile.create(0.5, 2.0)));
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleSummary(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableSummaryData.create(Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+
+ assertThat(batches).hasSize(2);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.SUMMARY);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getSummaryData().getPoints()).containsExactly(p1);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.SUMMARY);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getSummaryData().getPoints()).containsExactly(p2);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_MultipleBatches() {
+ MetricExportBatcher batcher = new MetricExportBatcher(2);
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L);
+ LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L);
+
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+ List> batchesList = new ArrayList<>(batches);
+
+ assertThat(batchesList).hasSize(3);
+ Collection firstBatch = batchesList.get(0);
+ Collection secondBatch = batchesList.get(1);
+ Collection thirdBatch = batchesList.get(2);
+
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+ assertThat(thirdBatch).hasSize(1);
+
+ MetricData firstBatchMetricData = firstBatch.iterator().next();
+ assertThat(firstBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(firstBatchMetricData.getName()).isEqualTo("name");
+ assertThat(firstBatchMetricData.getDescription()).isEqualTo("desc");
+ assertThat(firstBatchMetricData.getUnit()).isEqualTo("1");
+ assertThat(firstBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p1, p2);
+
+ MetricData secondBatchMetricData = secondBatch.iterator().next();
+ assertThat(secondBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(secondBatchMetricData.getName()).isEqualTo("name");
+ assertThat(secondBatchMetricData.getDescription()).isEqualTo("desc");
+ assertThat(secondBatchMetricData.getUnit()).isEqualTo("1");
+ assertThat(secondBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p3, p4);
+
+ MetricData thirdBatchMetricData = thirdBatch.iterator().next();
+ assertThat(thirdBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(thirdBatchMetricData.getName()).isEqualTo("name");
+ assertThat(thirdBatchMetricData.getDescription()).isEqualTo("desc");
+ assertThat(thirdBatchMetricData.getUnit()).isEqualTo("1");
+ assertThat(thirdBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p5);
+ }
+
+ @Test
+ void batchMetrics_SplitsDoubleSum_MultipleBatches() {
+ MetricExportBatcher batcher = new MetricExportBatcher(1);
+ DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0);
+ DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0);
+ DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0);
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ Arrays.asList(p1, p2, p3)));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+ List> batchesList = new ArrayList<>(batches);
+
+ assertThat(batchesList).hasSize(3);
+ Collection firstBatch = batchesList.get(0);
+ Collection secondBatch = batchesList.get(1);
+ Collection thirdBatch = batchesList.get(2);
+
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+ assertThat(thirdBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1);
+ assertThat(m1.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m1.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getDoubleSumData().getPoints()).containsExactly(p2);
+ assertThat(m2.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m2.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+
+ MetricData m3 = thirdBatch.iterator().next();
+ assertThat(m3.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m3.getName()).isEqualTo("name");
+ assertThat(m3.getDescription()).isEqualTo("desc");
+ assertThat(m3.getUnit()).isEqualTo("1");
+ assertThat(m3.getDoubleSumData().getPoints()).containsExactly(p3);
+ assertThat(m3.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m3.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_EmptyPointsInMetricData() {
+ MetricExportBatcher batcher = new MetricExportBatcher(2);
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Collections.emptyList()));
+
+ Collection> batches =
+ batcher.batchMetrics(Collections.singletonList(metric));
+ assertThat(batches).hasSize(1);
+ assertThat(batches.iterator().next()).containsExactly(metric);
+ }
+}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
index eb7336e579e..45b936045d8 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -53,8 +54,13 @@
@MockitoSettings(strictness = Strictness.LENIENT)
class PeriodicMetricReaderTest {
private static final List LONG_POINT_LIST =
- Collections.singletonList(
- ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1234567));
+ Arrays.asList(
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 2L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 3L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 4L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 5L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 6L));
private static final MetricData METRIC_DATA =
ImmutableMetricData.createLongSum(
@@ -95,6 +101,19 @@ void startOnlyOnce() {
verify(scheduler, times(1)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());
}
+ @Test
+ void build_WithIllegalMaxExportSize() {
+ assertThatThrownBy(
+ () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(0).build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+
+ assertThatThrownBy(
+ () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(-1).build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+ }
+
@Test
void periodicExport() throws Exception {
WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
@@ -116,6 +135,103 @@ void periodicExport() throws Exception {
}
}
+ @Test
+ void periodicExport_WithMaxExportBatchSize_PartiallyFilledBatch() throws Exception {
+ WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(waitingMetricExporter)
+ .setInterval(Duration.ofMillis(100))
+ .setMaxExportBatchSize(4)
+ .build();
+
+ reader.register(collectionRegistration);
+ MetricData expectedMetricDataBatch1 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(0, 4)));
+ MetricData expectedMetricDataBatch2 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(4, 6)));
+ try {
+ assertThat(waitingMetricExporter.waitForNumberOfExports(2))
+ .containsExactly(
+ Collections.singletonList(expectedMetricDataBatch1),
+ Collections.singletonList(expectedMetricDataBatch2));
+ } finally {
+ reader.shutdown();
+ }
+ }
+
+ @Test
+ void periodicExport_WithMaxExportBatchSize_CompletelyFilledBatch() throws Exception {
+ WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(waitingMetricExporter)
+ .setInterval(Duration.ofMillis(100))
+ .setMaxExportBatchSize(2)
+ .build();
+
+ reader.register(collectionRegistration);
+ MetricData expectedMetricDataBatch1 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(0, 2)));
+ MetricData expectedMetricDataBatch2 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(2, 4)));
+
+ MetricData expectedMetricDataBatch3 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(4, 6)));
+ try {
+ assertThat(waitingMetricExporter.waitForNumberOfExports(3))
+ .containsExactly(
+ Collections.singletonList(expectedMetricDataBatch1),
+ Collections.singletonList(expectedMetricDataBatch2),
+ Collections.singletonList(expectedMetricDataBatch3));
+ } finally {
+ reader.shutdown();
+ }
+ }
+
@Test
void periodicExport_NoMetricsSkipsExport() {
WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
@@ -260,12 +376,14 @@ void stringRepresentation() {
assertThat(
PeriodicMetricReader.builder(metricExporter)
.setInterval(Duration.ofSeconds(1))
+ .setMaxExportBatchSize(200)
.build()
.toString())
.isEqualTo(
"PeriodicMetricReader{"
+ "exporter=MockMetricExporter{}, "
- + "intervalNanos=1000000000"
+ + "intervalNanos=1000000000, "
+ + "metricsBatcher=MetricExportBatcher{maxExportBatchSize=200}"
+ "}");
}