Flink 2.0: Replace Caffeine maxSize cache with LRUCache#13382
Flink 2.0: Replace Caffeine maxSize cache with LRUCache#13382pvary merged 1 commit intoapache:mainfrom
Conversation
715be14 to
0689560
Compare
|
Is Caffeine is a multi-threaded cache with an adaptive eviction policy that maximizes the hit rates based on the observed workload. This does incur additional overhead but can greatly improve the overall system performance. I adjusted Caffeine's benchmark to run as a single threaded and with 16 threads on my 14-core M3 MAX laptop using OpenJDK 24. This uses a Zipfian distribution to simulate hot/cold items with a 100% hit rate. This was not a clean system as I was on a conference call while writing code, which only hurt Caffeine since it will utilize all of the cores. In a cloud environment you will likely observe worse throughput due to virtualization, numa effects, noisy neighbors, older hardware, etc. In general you want to drive application performance decision by profiling to resolve hotspots, as small optimizations can backfire when your benchmark does not fit your real workload.
|
|
Thank you for a detailed reply.
It was unclear from the caffeine project description that the cache is specifically optimised for high concurrency. I also didn't find any single-threaded benchmarks online, so I wrote my own here which gave these results: I guess it makes sense that caffeine performs worse in a single-threaded scenario due to thread synchronisation. It might be worth clarifying this on the project page so it's more visible for users. |
|
Yep, yours are reasonable but you don’t need to have the loop and can let jmh handle it for better clarity of the results. Those are 100k calls per unit. It’s not that much worse as 44M vs 76M reads/s is far faster than typically needed. Usually those who care need primitive collections and are very specialized. The much better hit rate than LRU more than compensates because milliseconds for an extra miss outweighs saving a few nanoseconds per hit. LHM as an LRU is really great, but LRU isn’t as good as people assume. |
|
Thanks for the PR @aiborodin, and @ben-manes for the nice detailed test and explanation. We are debating sharing the TableCache on JVM level. Your highlights about the concurrent cache access will be a very useful data point in that discussion. Currently the cache access is single threaded, so we can get away with the suggested LHM solution. |
0689560 to
d430d54
Compare
mxm
left a comment
There was a problem hiding this comment.
That's a clever find! As Peter said, we want to eventually share this cache across all components of DynamicSink. We might want to re-evaluate then, but this is good for now.
What about the other instances of Caffeine in the Flink module? E.g. TableSerializerCache, HashKeyGenerator, DynamicWriteResultAggregator, DynamicWriter (they all use Caffeine).
|
@aiborodin: Do you want to check the other instances of the cache mentioned by @mxm in this PR, or you want to do them in another PR? |
d430d54 to
c7afaa2
Compare
|
I replaced all Caffeine this.specs =
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
this.outputFileFactories =
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();Should we replace that with |
|
We can probably replace that one as well. There is nothing inherently different about that cache. |
Before we move forward, let's take a step back and consider the size of the cache, and the cost of a cache miss. How many items do we need in the cache for the optimal operation:
This one looks like number 3 to me. Also the cache access is only several times during a checkpoint, so the performance is less important. So for me this seems like better handled by caffeine, but I could be convinced. Also please revisit the pervious decisions on the cache sizes, and reconsider if appropriate. Thanks, |
|
You are right Peter, the cache size should probably be proportional to the "active" tables, but using a time-based eviction policy, we have seen worse performance. That was the reason we switched to max size / LRU. So there should probably be an LRU eviction policy not across ALL tables, but per-table. We could extend LRUCache to use a LinkedList per table. We probably have to then compare it again to something like Caffeine to decide if the custom cache implementation is still worth it. |
IIUC, we access this cache only a few times every checkpoint. Sum of (table x parallelism for the table). Not very few, but not like for every record. Probably doesn't worth the extra complexity. |
c7afaa2 to
ac76a15
Compare
We recently discovered that LRUCache, based on LinkedHashMap, performs almost twice as fast as the Caffeine max size cache. Let's replace the caffeine cache to optimise the performance.
ac76a15 to
7ba9960
Compare
|
I agree that the caching/eviction policy makes more sense on a per-table basis. We can keep this PR scoped to replacing |
|
LGTM |
|
Merged to main. |
|
Thank you for merging @pvary! Thank you, @ben-manes, for the benchmarking analysis and @mxm, for the review. |

We recently discovered that LRUCache, based on
LinkedHashMap, has a throughput almost two times as high as the Caffeine Cache with the maximum size configured. Please, see JMH benchmark results here.Let's use
LRUCacheinTableMetadataCacheto improve the cache performance of theDynamicIcebergSink.