Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package statecache implements the state caching feature described by the
// Beam Fn API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a link to a Doc, or even just an appropriate location of the proto on github, that might be useful to add here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added a link to a caching document with the usual graphics explaining the behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider describing what the link is? As it stands, it's just a stand alone link that's probably about caching, but could be anything.

eg. The Beam State API along with intended caching behavior is described here:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skimming through that section of the doc, the recommendation is a "use a memory cost weighted LRU strategy with an upper bound as a fixed percentage of available memory." which I agree, would be great, but also complicated (since we don't know the decoded size of the reusable input until after it's been decoded...). Random with a simple cap will have to do for now.

//
// The Beam State API and the intended caching behavior are described here:
// https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
package statecache

import (
"sync"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
)

type token string

// SideInputCache stores a cache of reusable inputs for the purposes of
// eliminating redundant calls to the runner during execution of ParDos
// using side inputs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider describing in prose how use of this maps to an SDK worker lifecyle. Not in great depth, but in particular:
initialization, a bundle request comes in, during bundle execution, bundle termination, if subsequent bundle requests come in, multiple happen simultaneously, sequentially, etc. If eviction needs to happen what's prioritized?

The goal is to allow someone to understand how it works, and possibly use it (maybe not in this case) without reading the raw code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a slightly more in-depth comment

//
// A SideInputCache should be initialized when the SDK harness is initialized,
// creating storage for side input caching. On each ProcessBundleRequest,
// the cache will process the list of tokens for cacheable side inputs and
// be queried when side inputs are requested in bundle execution. Once a
// new bundle request comes in the valid tokens will be updated and the cache
// will be re-used. In the event that the cache reaches capacity, a random,
// currently invalid cached object will be evicted.
type SideInputCache struct {
capacity int
mu sync.Mutex
cache map[token]exec.ReusableInput
idsToTokens map[string]token
validTokens map[token]int8 // Maps tokens to active bundle counts
metrics CacheMetrics
}

type CacheMetrics struct {
Hits int64
Misses int64
Evictions int64
InUseEvictions int64
}

// Init makes the cache map and the map of IDs to cache tokens for the
// SideInputCache. Should only be called once. Returns an error for
// non-positive capacities.
func (c *SideInputCache) Init(cap int) error {
if cap <= 0 {
return errors.Errorf("capacity must be a positive integer, got %v", cap)
}
c.mu.Lock()
defer c.mu.Unlock()
c.cache = make(map[token]exec.ReusableInput, cap)
c.idsToTokens = make(map[string]token)
c.validTokens = make(map[token]int8)
c.capacity = cap
return nil
}

// SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of
// transform and side input IDs to cache tokens in the process. Should be called at the start of every
// new ProcessBundleRequest. If the runner does not support caching, the passed cache token values
// should be empty and all get/set requests will silently be no-ops.
func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) {
c.mu.Lock()
defer c.mu.Unlock()
for _, tok := range cacheTokens {
// User State caching is currently not supported, so these tokens are ignored
if tok.GetUserState() != nil {
continue
}
s := tok.GetSideInput()
transformID := s.GetTransformId()
sideInputID := s.GetSideInputId()
t := token(tok.GetToken())
c.setValidToken(transformID, sideInputID, t)
}
}

// setValidToken adds a new valid token for a request into the SideInputCache struct
// by mapping the transform ID and side input ID pairing to the cache token.
func (c *SideInputCache) setValidToken(transformID, sideInputID string, tok token) {
idKey := transformID + sideInputID
c.idsToTokens[idKey] = tok
count, ok := c.validTokens[tok]
if !ok {
c.validTokens[tok] = 1
} else {
c.validTokens[tok] = count + 1
}
Comment on lines +101 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the structural parity between this method and decrementTokenCount, but do want to call something about about Go maps.

FYI Go Maps automatically return the zero value for the value type. See https://tour.golang.org/moretypes/22

In this case, it's 0. In other words, this block can be simplified a little. (but as I said, structural parity).

}

// CompleteBundle takes the cache tokens passed to set the valid tokens and decrements their
// usage count for the purposes of maintaining a valid count of whether or not a value is
// still in use. Should be called once ProcessBundle has completed.
func (c *SideInputCache) CompleteBundle(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) {
c.mu.Lock()
defer c.mu.Unlock()
for _, tok := range cacheTokens {
// User State caching is currently not supported, so these tokens are ignored
if tok.GetUserState() != nil {
continue
}
t := token(tok.GetToken())
c.decrementTokenCount(t)
}
}

// decrementTokenCount decrements the validTokens entry for
// a given token by 1. Should only be called when completing
// a bundle.
func (c *SideInputCache) decrementTokenCount(tok token) {
count := c.validTokens[tok]
if count == 1 {
delete(c.validTokens, tok)
} else {
c.validTokens[tok] = count - 1
}
}

func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (token, bool) {
idKey := transformID + sideInputID
// Check if it's a known token
tok, ok := c.idsToTokens[idKey]
if !ok {
return "", false
}
return tok, c.isValid(tok)
}

// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What' invalidates a side input token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short: the runner decides what side input tokens are valid/invalid for a given bundle. In this current setup the map of IDs to tokens is disjointed from this concept of validity, it just keeps up with tokens that have been seen before. So an invalid token in this case would be a token that has been seen before but isn't valid for the current bundle. Of course, I now think that extra level may be unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the issue with the makeAndValidateToken call, is that as implemented, a newer bundle with different side inputs will invalidate cache calls for side inputs to other existing bundles. And this can flip back and forth on boundary conditions as a result. (A bundles needs set A, the next bundle needs set B, the third needs set A again, invalidating all the B cache requests...)

So this implies we should roughly keep the active set of tokens for the active bundles. Say a map[token]int. (or map[token]int8) which says how many bundles are making use of a given token. At the start of the bundles we add one to the tokens to the active set, and when the bundle ends, we remove them (subtracting from the active counts), and deleting when the count reaches 0. (this is important for iterating/eviction, and to keep the map memory smaller).

That way, the isValid call can check validity by seeing if c.validTokens[token] > 0, prioritizing deletions for tokens that are not valid as you intend, rather than simply prioritizing "the most recent bundle".

// cache miss.
func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReusableInput {
c.mu.Lock()
defer c.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a small concern around the QueryCache call in that at worst it means a lock call (and a serialization of cross bundle calls) for every element, but I expect the single value/in bundle cache + the benefits of this cache in general override that concern.
However, that's something we can probably figure out a way to measure, or have an "off switch" for just in case, in a later PR.

tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return nil
}
// Check to see if cached
input, ok := c.cache[tok]
if !ok {
c.metrics.Misses++
return nil
}

c.metrics.Hits++
return input
}

// SetCache allows a user to place a ReusableInput materialized from the reader into the SideInputCache
// with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token
// then we silently do not cache the input, as this is an indication that the runner is treating that input
// as uncacheable.
func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.ReusableInput) {
c.mu.Lock()
defer c.mu.Unlock()
tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return
}
if len(c.cache) >= c.capacity {
c.evictElement()
}
c.cache[tok] = input
}

func (c *SideInputCache) isValid(tok token) bool {
count, ok := c.validTokens[tok]
// If the token is not known or not in use, return false
return ok && count > 0
}

// evictElement randomly evicts a ReusableInput that is not currently valid from the cache.
// It should only be called by a goroutine that obtained the lock in SetCache.
func (c *SideInputCache) evictElement() {
deleted := false
// Select a key from the cache at random
for k := range c.cache {
// Do not evict an element if it's currently valid
if !c.isValid(k) {
delete(c.cache, k)
c.metrics.Evictions++
deleted = true
break
}
}
// Nothing is deleted if every side input is still valid. Clear
// out a random entry and record the in-use eviction
if !deleted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's a good idea to try to avoid avoid evicting a small set of values from the cache, it would be incorrect to fail pipelines because the cache is too small. The cost of a too small a cache should be "more work" because of the additional read from the runner.

The Portable Beam model does allow for SDK side metrics that aren't attached to any bundle. SideInput cache hits, misses, and evictions, and "in use" evictions would be a great set of metrics to collect, and we can see how much this gets used for arbitrary pipelines. I recommend we collect those instead (int64s protected by the lock, possibly abstracted into an easy to copy/return by value struct for later access), and I'll work with you to get them into the monitoring responses in a later PR.

This also avoids returning errors around when things can't/shouldn't fail.

Copy link
Contributor Author

@jrmccluskey jrmccluskey Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the idea of moving to populating metrics for this instead of throwing an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a first run at it, hopefully it's what you have in mind. All of the operations that would add metrics (hit, miss, evict, failed evict) are already behind having the lock so that was pretty straightforward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics: Great!.

But also, as mentioned, we probably always want to delete something if an eviction is necessary to avoid memory problems which will just end up crashing a worker if left to grow.

It's totally OK to do it if the currently active set of tokens says everything is being used. Hence the value of the "InUseEvictionCalls" metric. We/users can see if it's a problem, and we can adjust the default/ or make it configurable for users and they can make that choice if needed.

for k := range c.cache {
delete(c.cache, k)
c.metrics.InUseEvictions++
break
}
}
}
Loading