diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WeightedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WeightedValue.java index b6f43f723929..539dcccb1216 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WeightedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WeightedValue.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import java.util.Objects; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; /** @@ -47,4 +49,26 @@ public long getWeight() { public T getValue() { return value; } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof WeightedValue)) { + return false; + } + WeightedValue that = (WeightedValue) o; + return weight == that.weight && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value, weight); + } + + @Override + public String toString() { + return "WeightedValue{value=" + value + ", weight=" + weight + "}"; + } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java index 03b70afd3244..3164fb241be4 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java @@ -70,4 +70,7 @@ interface Shrinkable { /** Removes the mapping for a key from the cache if it is present. */ void remove(K key); + + /** Returns a string containing caching statistics. */ + String describeStats(); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java index 95091fd83c1a..c77fb83ff65a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java @@ -23,13 +23,16 @@ import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import org.apache.beam.fn.harness.Cache.Shrinkable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.SdkHarnessOptions; import org.apache.beam.sdk.util.Weighted; +import org.apache.beam.sdk.util.WeightedValue; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheStats; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Weigher; @@ -43,7 +46,12 @@ public final class Caches { private static final Logger LOG = LoggerFactory.getLogger(Caches.class); - private static final int WEIGHT_RATIO = 64; + /** + * Object sizes will always be rounded up to the next multiple of {@code 2^WEIGHT_RATIO} when + * stored in the cache. This allows us to work around the limit on the Guava cache method which + * only allows int weights by scaling object sizes appropriately. + */ + @VisibleForTesting static final int WEIGHT_RATIO = 6; private static final MemoryMeter MEMORY_METER = MemoryMeter.builder().withGuessing(Guess.BEST).build(); @@ -57,31 +65,39 @@ public static long weigh(Object o) { /** An eviction listener that reduces the size of entries that are {@link Shrinkable}. */ @VisibleForTesting - static class ShrinkOnEviction implements RemovalListener { + static class ShrinkOnEviction implements RemovalListener> { private final org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache< - CompositeKey, Object> + CompositeKey, WeightedValue> cache; + private final LongAdder weightInBytes; - ShrinkOnEviction(CacheBuilder cacheBuilder) { + ShrinkOnEviction( + CacheBuilder> cacheBuilder, LongAdder weightInBytes) { this.cache = cacheBuilder.removalListener(this).build(); + this.weightInBytes = weightInBytes; } public org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache< - CompositeKey, Object> + CompositeKey, WeightedValue> getCache() { return cache; } @Override - public void onRemoval(RemovalNotification removalNotification) { + public void onRemoval( + RemovalNotification> removalNotification) { + weightInBytes.add( + -(removalNotification.getKey().getWeight() + removalNotification.getValue().getWeight())); if (removalNotification.wasEvicted()) { - if (!(removalNotification.getValue() instanceof Cache.Shrinkable)) { + if (!(removalNotification.getValue().getValue() instanceof Cache.Shrinkable)) { return; } - Object updatedEntry = ((Shrinkable) removalNotification.getValue()).shrink(); + Object updatedEntry = ((Shrinkable) removalNotification.getValue().getValue()).shrink(); if (updatedEntry != null) { - cache.put(removalNotification.getKey(), updatedEntry); + cache.put( + removalNotification.getKey(), + addWeightedValue(removalNotification.getKey(), updatedEntry, weightInBytes)); } } } @@ -89,19 +105,12 @@ public void onRemoval(RemovalNotification removalNotificat /** A cache that never stores any values. */ public static Cache noop() { - // We specifically use Guava cache since it allows for recursive computeIfAbsent calls - // preventing deadlock from occurring when a loading function mutates the underlying cache - return (Cache) - forCache(new ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(0)).getCache()); + return forMaximumBytes(0L); } /** A cache that never evicts any values. */ public static Cache eternal() { - // We specifically use Guava cache since it allows for recursive computeIfAbsent calls - // preventing deadlock from occurring when a loading function mutates the underlying cache - return (Cache) - forCache( - new ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(Long.MAX_VALUE)).getCache()); + return forMaximumBytes(Long.MAX_VALUE); } /** @@ -109,40 +118,8 @@ public static Cache eternal() { * parameters within {@link SdkHarnessOptions}. */ public static Cache fromOptions(PipelineOptions options) { - // We specifically use Guava cache since it allows for recursive computeIfAbsent calls - // preventing deadlock from occurring when a loading function mutates the underlying cache - return (Cache) - forCache( - new ShrinkOnEviction( - CacheBuilder.newBuilder() - .maximumWeight( - options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() - * 1024L - * 1024L - / WEIGHT_RATIO) - .weigher( - new Weigher() { - - @Override - public int weigh(Object key, Object value) { - long size; - if (value instanceof Weighted) { - size = Caches.weigh(key) + ((Weighted) value).getWeight(); - } else { - size = Caches.weigh(key) + Caches.weigh(value); - } - size = size / WEIGHT_RATIO + 1; - if (size >= Integer.MAX_VALUE) { - LOG.warn( - "Entry with size {} MiBs inserted into the cache. This is larger than the maximum individual entry size of {} MiBs. The cache will under report its memory usage by the difference. This may lead to OutOfMemoryErrors.", - (size / 1048576L) + 1, - 2 * WEIGHT_RATIO * 1024); - return Integer.MAX_VALUE; - } - return (int) size; - } - })) - .getCache()); + return forMaximumBytes( + ((long) options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb()) << 20); } /** @@ -156,7 +133,9 @@ public static Cache subCache( if (cache instanceof SubCache) { return new SubCache<>( ((SubCache) cache).cache, - ((SubCache) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix)); + ((SubCache) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix), + ((SubCache) cache).maxWeightInBytes, + ((SubCache) cache).weightInBytes); } throw new IllegalArgumentException( String.format( @@ -165,10 +144,69 @@ public static Cache subCache( } @VisibleForTesting - static Cache forCache( - org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache - cache) { - return new SubCache<>(cache, CompositeKeyPrefix.ROOT); + static Cache forMaximumBytes(long maximumBytes) { + // We specifically use Guava cache since it allows for recursive computeIfAbsent calls + // preventing deadlock from occurring when a loading function mutates the underlying cache + LongAdder weightInBytes = new LongAdder(); + return new SubCache<>( + new ShrinkOnEviction( + CacheBuilder.newBuilder() + .maximumWeight(maximumBytes >> WEIGHT_RATIO) + .weigher( + new Weigher>() { + + @Override + public int weigh(CompositeKey key, WeightedValue value) { + // Round up to the next closest multiple of WEIGHT_RATIO + long size = + ((key.getWeight() + value.getWeight() - 1) >> WEIGHT_RATIO) + 1; + if (size > Integer.MAX_VALUE) { + LOG.warn( + "Entry with size {} MiBs inserted into the cache. This is larger than the maximum individual entry size of {} MiBs. The cache will under report its memory usage by the difference. This may lead to OutOfMemoryErrors.", + ((size - 1) >> 20) + 1, + 2 << (WEIGHT_RATIO + 10)); + return Integer.MAX_VALUE; + } + return (int) size; + } + }) + // The maximum size of an entry in the cache is maxWeight / concurrencyLevel + // which is why we set the concurrency level to 1. See + // https://github.com/google/guava/issues/3462 for further details. + // + // The ProcessBundleBenchmark#testStateWithCaching shows no noticeable change + // when this parameter is left at the default. + .concurrencyLevel(1) + .recordStats(), + weightInBytes) + .getCache(), + CompositeKeyPrefix.ROOT, + maximumBytes, + weightInBytes); + } + + private static long findWeight(Object o) { + if (o instanceof WeightedValue) { + return ((WeightedValue) o).getWeight(); + } else if (o instanceof Weighted) { + return ((Weighted) o).getWeight(); + } else { + return weigh(o); + } + } + + private static WeightedValue addWeightedValue( + CompositeKey key, Object o, LongAdder weightInBytes) { + WeightedValue rval; + if (o instanceof WeightedValue) { + rval = (WeightedValue) o; + } else if (o instanceof Weighted) { + rval = WeightedValue.of(o, ((Weighted) o).getWeight()); + } else { + rval = WeightedValue.of(o, weigh(o)); + } + weightInBytes.add(key.getWeight() + rval.getWeight()); + return rval; } /** @@ -179,27 +217,44 @@ static Cache forCache( */ private static class SubCache implements Cache { private final org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache< - CompositeKey, Object> + CompositeKey, WeightedValue> cache; private final CompositeKeyPrefix keyPrefix; + private final long maxWeightInBytes; + private final LongAdder weightInBytes; SubCache( - org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache + org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache< + CompositeKey, WeightedValue> cache, - CompositeKeyPrefix keyPrefix) { + CompositeKeyPrefix keyPrefix, + long maxWeightInBytes, + LongAdder weightInBytes) { this.cache = cache; this.keyPrefix = keyPrefix; + this.maxWeightInBytes = maxWeightInBytes; + this.weightInBytes = weightInBytes; } @Override public V peek(K key) { - return (V) cache.getIfPresent(keyPrefix.valueKey(key)); + WeightedValue value = cache.getIfPresent(keyPrefix.valueKey(key)); + if (value == null) { + return null; + } + return (V) value.getValue(); } @Override public V computeIfAbsent(K key, Function loadingFunction) { try { - return (V) cache.get(keyPrefix.valueKey(key), () -> loadingFunction.apply(key)); + CompositeKey compositeKey = keyPrefix.valueKey(key); + return (V) + cache + .get( + compositeKey, + () -> addWeightedValue(compositeKey, loadingFunction.apply(key), weightInBytes)) + .getValue(); } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -207,23 +262,40 @@ public V computeIfAbsent(K key, Function loadingFunction) { @Override public void put(K key, V value) { - cache.put(keyPrefix.valueKey(key), value); + CompositeKey compositeKey = keyPrefix.valueKey(key); + cache.put(compositeKey, addWeightedValue(compositeKey, value, weightInBytes)); } @Override public void remove(K key) { cache.invalidate(keyPrefix.valueKey(key)); } + + @Override + public String describeStats() { + CacheStats stats = cache.stats(); + return String.format( + "used/max %d/%d MB, hit %.2f%%, lookups %d, avg load time %.0f ns, loads %d, evictions %d", + weightInBytes.longValue() >> 20, + maxWeightInBytes >> 20, + stats.hitRate() * 100., + stats.requestCount(), + stats.averageLoadPenalty(), + stats.loadCount(), + stats.evictionCount()); + } } /** A key prefix used to generate keys that are stored within a sub-cache. */ static class CompositeKeyPrefix { - public static final CompositeKeyPrefix ROOT = new CompositeKeyPrefix(new Object[0]); + public static final CompositeKeyPrefix ROOT = new CompositeKeyPrefix(new Object[0], 0); private final Object[] namespace; + private final long weight; - private CompositeKeyPrefix(Object[] namespace) { + private CompositeKeyPrefix(Object[] namespace, long weight) { this.namespace = namespace; + this.weight = weight; } CompositeKeyPrefix subKey(Object suffix, Object... additionalSuffixes) { @@ -232,11 +304,15 @@ CompositeKeyPrefix subKey(Object suffix, Object... additionalSuffixes) { subKey[namespace.length] = suffix; System.arraycopy( additionalSuffixes, 0, subKey, namespace.length + 1, additionalSuffixes.length); - return new CompositeKeyPrefix(subKey); + long subKeyWeight = weight + findWeight(suffix); + for (int i = 0; i < additionalSuffixes.length; ++i) { + subKeyWeight += findWeight(additionalSuffixes[i]); + } + return new CompositeKeyPrefix(subKey, subKeyWeight); } CompositeKey valueKey(K k) { - return new CompositeKey(namespace, k); + return new CompositeKey(namespace, weight, k); } boolean isProperPrefixOf(CompositeKey otherKey) { @@ -268,18 +344,20 @@ boolean isEquivalentNamespace(CompositeKey otherKey) { /** A tuple of key parts used to represent a key within a cache. */ @VisibleForTesting - static class CompositeKey { + static class CompositeKey implements Weighted { private final Object[] namespace; private final Object key; + private final long weight; - private CompositeKey(Object[] namespace, Object key) { + private CompositeKey(Object[] namespace, long namespaceWeight, Object key) { this.namespace = namespace; this.key = key; + this.weight = namespaceWeight + findWeight(key); } @Override public String toString() { - return "CompositeKey{" + "namespace=" + Arrays.toString(namespace) + ", key=" + key + "}"; + return "CompositeKey{namespace=" + Arrays.toString(namespace) + ", key=" + key + "}"; } @Override @@ -298,6 +376,11 @@ public boolean equals(Object o) { public int hashCode() { return Arrays.hashCode(namespace); } + + @Override + public long getWeight() { + return weight; + } } /** @@ -310,7 +393,11 @@ public static class ClearableCache extends SubCache { private final Set weakHashSet; public ClearableCache(Cache cache) { - super(((SubCache) cache).cache, ((SubCache) cache).keyPrefix); + super( + ((SubCache) cache).cache, + ((SubCache) cache).keyPrefix, + ((SubCache) cache).maxWeightInBytes, + ((SubCache) cache).weightInBytes); // We specifically use a weak hash map so that once the key is no longer referenced we don't // have to keep track of it anymore and the weak hash map will garbage collect it for us. this.weakHashSet = Collections.newSetFromMap(new WeakHashMap<>()); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 5742db356e49..b3206d22ee99 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -287,7 +287,8 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) { statusApiServiceDescriptor, channelFactory::forDescriptor, processBundleHandler.getBundleProcessorCache(), - options); + options, + processWideCache); } // TODO(BEAM-9729): Remove once runners no longer send this instruction. diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java index ccdfacf1fc75..931a4cd1123c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.apache.beam.fn.harness.Cache; import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor; import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache; import org.apache.beam.model.fnexecution.v1.BeamFnApi; @@ -52,17 +53,20 @@ public class BeamFnStatusClient implements AutoCloseable { private final CompletableFuture inboundObserverCompletion; private static final Logger LOG = LoggerFactory.getLogger(BeamFnStatusClient.class); private final MemoryMonitor memoryMonitor; + private final Cache cache; public BeamFnStatusClient( ApiServiceDescriptor apiServiceDescriptor, Function channelFactory, BundleProcessorCache processBundleCache, - PipelineOptions options) { + PipelineOptions options, + Cache cache) { this.channel = channelFactory.apply(apiServiceDescriptor); this.outboundObserver = BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); this.processBundleCache = processBundleCache; this.memoryMonitor = MemoryMonitor.fromOptions(options); + this.cache = cache; this.inboundObserverCompletion = new CompletableFuture<>(); Thread thread = new Thread(memoryMonitor); thread.setDaemon(true); @@ -157,6 +161,15 @@ String getMemoryUsage() { memory.add(memoryMonitor.describeMemory()); return memory.toString(); } + + @VisibleForTesting + String getCacheStats() { + StringJoiner cacheStats = new StringJoiner("\n"); + cacheStats.add("========== CACHE STATS =========="); + cacheStats.add(cache.describeStats()); + return cacheStats.toString(); + } + /** Class representing the execution state of a bundle. */ static class BundleState { final String instruction; @@ -231,6 +244,8 @@ public void onNext(WorkerStatusRequest workerStatusRequest) { StringJoiner status = new StringJoiner("\n"); status.add(getMemoryUsage()); status.add("\n"); + status.add(getCacheStats()); + status.add("\n"); status.add(getActiveProcessBundleState()); status.add("\n"); status.add(getThreadDump()); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java index b2d43afbf64b..b004ba905083 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java @@ -17,15 +17,17 @@ */ package org.apache.beam.fn.harness; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import org.apache.beam.fn.harness.Cache.Shrinkable; import org.apache.beam.fn.harness.Caches.ClearableCache; -import org.apache.beam.fn.harness.Caches.ShrinkOnEviction; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.sdk.util.Weighted; +import org.apache.beam.sdk.util.WeightedValue; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,24 +46,24 @@ public void testNoopCache() throws Exception { @Test public void testShrinkableIsShrunk() throws Exception { + WeightedValue shrinkableKey = WeightedValue.of("shrinkable", MB); Shrinkable shrinkable = new Shrinkable() { @Override public Object shrink() { - return "wasShrunk"; + return WeightedValue.of("wasShrunk", 1); } }; - Cache cache = - Caches.forCache(new ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(1)).getCache()); - cache.put("shrinkable", shrinkable); + Cache cache = Caches.forMaximumBytes(2 * MB); + cache.put(shrinkableKey, WeightedValue.of(shrinkable, MB)); // Check that we didn't evict it yet - assertSame(shrinkable, cache.peek("shrinkable")); + assertSame(shrinkable, cache.peek(shrinkableKey)); - // The next insertion should cause the value to b "shrunk" - cache.put("other", "value"); - assertEquals("wasShrunk", cache.peek("shrinkable")); + // The next insertion should cause the value to be "shrunk" + cache.put(WeightedValue.of("other", 1), WeightedValue.of("value", 1)); + assertEquals("wasShrunk", cache.peek(shrinkableKey)); } @Test @@ -160,4 +162,63 @@ private void testCache(Cache cache) { assertEquals("value2", cache.computeIfAbsent("key2", (unused) -> "value2")); assertEquals("value2", cache.peek("key2")); } + + private static final long MB = 1 << 20; + + private static class ShrinkableString implements Shrinkable, Weighted { + private final String value; + private final long weight; + + public ShrinkableString(String value, long weight) { + this.value = value; + this.weight = weight; + } + + @Override + public ShrinkableString shrink() { + if (weight < 800 * MB) { + return null; + } + return new ShrinkableString(value, weight / 2); + } + + @Override + public long getWeight() { + return weight; + } + } + + @Test + public void testDescribeStats() throws Exception { + Cache, ShrinkableString> cache = Caches.forMaximumBytes(1000 * MB); + for (int i = 0; i < 100; ++i) { + cache.computeIfAbsent( + WeightedValue.of(i, MB), (key) -> new ShrinkableString("value", 2 * MB)); + cache.peek(WeightedValue.of(i, MB)); + cache.put(WeightedValue.of(100 + i, MB), new ShrinkableString("value", 2 * MB)); + } + + assertThat(cache.describeStats(), containsString("used/max 600/1000 MB")); + assertThat(cache.describeStats(), containsString("hit 50.00%")); + assertThat(cache.describeStats(), containsString("lookups 200")); + assertThat(cache.describeStats(), containsString("avg load time")); + assertThat(cache.describeStats(), containsString("loads 100")); + assertThat(cache.describeStats(), containsString("evictions 0")); + + // Test eviction, evict all the other 200 elements that were added + cache.put(WeightedValue.of(1000, 100 * MB), new ShrinkableString("value", 900 * MB)); + assertThat(cache.describeStats(), containsString("used/max 1000/1000 MB")); + assertThat(cache.describeStats(), containsString("evictions 200")); + + // Test shrinking, 900 -> 450 + 100 + 55 + 1 = 606 + cache.put(WeightedValue.of(1001, MB), new ShrinkableString("value", 55 * MB)); + assertThat(cache.describeStats(), containsString("used/max 606/1000 MB")); + assertThat(cache.describeStats(), containsString("evictions 201")); + + // Test composite key namespace is weighed as well. + // 33 + 8 + 3 = 44 more then last used/max of 606 = 650 + Caches.subCache(cache, WeightedValue.of("subCache", 33 * MB)) + .put(WeightedValue.of("subCacheKey", 8 * MB), WeightedValue.of("subCacheValue", 3 * MB)); + assertThat(cache.describeStats(), containsString("used/max 650/1000 MB")); + } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java index fb9182d86d40..d7d1a501389f 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.fn.harness.Caches; import org.apache.beam.fn.harness.control.ProcessBundleHandler; import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor; import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache; @@ -86,7 +87,8 @@ public void testActiveBundleState() { apiServiceDescriptor, channelFactory::forDescriptor, handler.getBundleProcessorCache(), - PipelineOptionsFactory.create()); + PipelineOptionsFactory.create(), + Caches.noop()); StringJoiner joiner = new StringJoiner("\n"); joiner.add(client.getActiveProcessBundleState()); String actualState = joiner.toString(); @@ -128,7 +130,8 @@ public StreamObserver workerStatus( apiServiceDescriptor, channelFactory::forDescriptor, processorCache, - PipelineOptionsFactory.create()); + PipelineOptionsFactory.create(), + Caches.noop()); StreamObserver requestObserver = requestObservers.take(); requestObserver.onNext(WorkerStatusRequest.newBuilder().setId("id").build()); WorkerStatusResponse response = values.take(); @@ -138,4 +141,18 @@ public StreamObserver workerStatus( server.shutdownNow(); } } + + @Test + public void testCacheStatsExist() { + ManagedChannelFactory channelFactory = InProcessManagedChannelFactory.create(); + BeamFnStatusClient client = + new BeamFnStatusClient( + apiServiceDescriptor, + channelFactory::forDescriptor, + mock(BundleProcessorCache.class), + PipelineOptionsFactory.create(), + Caches.fromOptions( + PipelineOptionsFactory.fromArgs("--maxCacheMemoryUsageMb=234").create())); + assertThat(client.getCacheStats(), containsString("used/max 0/234 MB")); + } }