Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tieredstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa
| Histogram | rocketmq_tiered_store_provider_upload_bytes | byte |
| Histogram | rocketmq_tiered_store_provider_download_bytes | byte |
| Gauge | rocketmq_tiered_store_dispatch_behind | |
| Gauge | rocketmq_tiered_store_dispatch_latency | byte |
| Gauge | rocketmq_tiered_store_dispatch_latency | milliseconds |
| Counter | rocketmq_tiered_store_messages_dispatch_total | |
| Counter | rocketmq_tiered_store_messages_out_total | |
| Counter | rocketmq_tiered_store_get_message_fallback_total | |
| Gauge | rocketmq_tiered_store_read_ahead_cache_count | |
| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | byte |
| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | bytes |
| Counter | rocketmq_tiered_store_read_ahead_cache_access_total | |
| Counter | rocketmq_tiered_store_read_ahead_cache_hit_total | |
| Gauge | rocketmq_storage_message_reserve_time | milliseconds |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild

dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY)
.setDescription("Tiered store dispatch latency")
.setUnit("seconds")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
Expand Down Expand Up @@ -261,7 +261,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
.ofLongs()
.buildWithCallback(measurement -> {
if (fetcher instanceof MessageStoreFetcherImpl) {
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().stats().loadCount();
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize();
measurement.record(count, newAttributesBuilder().build());
}
});
Expand All @@ -272,8 +272,10 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
.ofLongs()
.buildWithCallback(measurement -> {
if (fetcher instanceof MessageStoreFetcherImpl) {
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize();
measurement.record(count, newAttributesBuilder().build());
long bytes = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().policy().eviction()
.map(eviction -> eviction.weightedSize().orElse(0L))
.orElse(0L);
measurement.record(bytes, newAttributesBuilder().build());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,26 @@
*/
package org.apache.rocketmq.tieredstore.metrics;

import com.github.benmanes.caffeine.cache.Cache;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -52,4 +65,98 @@ public void init() {
public void newAttributesBuilder() {
TieredStoreMetricsManager.newAttributesBuilder();
}

@Test
public void testCacheCountMetric() {
MessageStoreConfig storeConfig = new MessageStoreConfig();
TieredMessageStore messageStore = Mockito.mock(TieredMessageStore.class);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
// The fetcher will create real cache
MessageStoreFetcherImpl fetcher = new MessageStoreFetcherImpl(messageStore);

AtomicLong capturedCacheCount = new AtomicLong(-1);
Meter mockMeter = createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_COUNT, capturedCacheCount);

// Prepare cache before init so the gauge callback sees a populated cache instead of an empty one.
int[] bufferSizes = prepareTestCache(fetcher);

TieredStoreMetricsManager.init(mockMeter,
null, storeConfig, fetcher,
Mockito.mock(FlatFileStore.class), Mockito.mock(DefaultMessageStore.class));

// CacheCount gauge should report the number of cached entries.
Assert.assertEquals(bufferSizes.length, capturedCacheCount.get());
}

@Test
public void testCacheBytesMetric() {
MessageStoreConfig storeConfig = new MessageStoreConfig();
TieredMessageStore messageStore = Mockito.mock(TieredMessageStore.class);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
// The fetcher will create real cache
MessageStoreFetcherImpl fetcher = new MessageStoreFetcherImpl(messageStore);

AtomicLong capturedCacheBytes = new AtomicLong(-1);
Meter mockMeter = createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_BYTES, capturedCacheBytes);

// Prepare cache before init so the gauge callback sees a populated cache instead of an empty one.
int[] bufferSizes = prepareTestCache(fetcher);

TieredStoreMetricsManager.init(mockMeter,
null, storeConfig, fetcher,
Mockito.mock(FlatFileStore.class), Mockito.mock(DefaultMessageStore.class));

// CacheBytes gauge should report the sum of all cached buffer sizes.
int expectedSum = Arrays.stream(bufferSizes).sum();
Assert.assertEquals(expectedSum, capturedCacheBytes.get());
}

private Meter createMockMeter(String targetMetricName, AtomicLong capturedValue) {
Meter mockMeter = Mockito.mock(Meter.class, Mockito.RETURNS_DEEP_STUBS);

// Setup target gauge builder chain to capture the callback value
DoubleGaugeBuilder targetGaugeBuilder = Mockito.mock(DoubleGaugeBuilder.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(mockMeter.gaugeBuilder(targetMetricName)).thenReturn(targetGaugeBuilder);
Mockito.when(targetGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(targetGaugeBuilder);
Mockito.when(targetGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(targetGaugeBuilder);
Mockito.when(targetGaugeBuilder.ofLongs().buildWithCallback(Mockito.any(Consumer.class)))
.thenAnswer(invocation -> {
Consumer<ObservableLongMeasurement> callback = invocation.getArgument(0);
// Immediately invoke the callback to capture the current cache state
callback.accept(new ObservableLongMeasurement() {
@Override
public void record(long value) {
capturedValue.set(value);
}

@Override
public void record(long value, Attributes attributes) {
capturedValue.set(value);
}
});
return Mockito.mock(ObservableLongGauge.class);
});

return mockMeter;
}

private int[] prepareTestCache(MessageStoreFetcherImpl fetcher) {
Cache<String, SelectBufferResult> cache = fetcher.getFetcherCache();
String topic = "TestTopic";
MessageQueue mq1 = new MessageQueue(topic, "broker", 0);
MessageQueue mq2 = new MessageQueue(topic, "broker", 1);

int[] bufferSizes = {100, 200, 150, 300};
for (int i = 0; i < bufferSizes.length; i++) {
SelectBufferResult result = new SelectBufferResult(
ByteBuffer.allocate(bufferSizes[i]), 0L, bufferSizes[i], 0L);
MessageQueue mq = i < 2 ? mq1 : mq2;
String key = String.format("%s@%d@%d", mq.getTopic(), mq.getQueueId(), (i + 1) * 100L);
cache.put(key, result);
}
return bufferSizes;
}

}
Loading