Skip to content

Conversation

@jrmccluskey
Copy link
Contributor

Adds implementation of a SideInputCache type that will be used to handle side input caching at the SDK harness level. Creates a package, adds the implementation, and unit tests.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@jrmccluskey
Copy link
Contributor Author

R: @lostluck

@jrmccluskey
Copy link
Contributor Author

We can move the code anywhere it makes sense, I just nested it in the harness directory for now since that's the top-level it would be instantiated in.

@pabloem pabloem requested a review from lostluck September 9, 2021 23:52
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the clean factoring for this. The real trick is where it needs to live. The comments are very good to start too, only a couple of nits.

// limitations under the License.

// Package statecache implements the state caching feature described by the
// Beam Fn API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a link to a Doc, or even just an appropriate location of the proto on github, that might be useful to add here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added a link to a caching document with the usual graphics explaining the behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider describing what the link is? As it stands, it's just a stand alone link that's probably about caching, but could be anything.

eg. The Beam State API along with intended caching behavior is described here:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skimming through that section of the doc, the recommendation is a "use a memory cost weighted LRU strategy with an upper bound as a fixed percentage of available memory." which I agree, would be great, but also complicated (since we don't know the decoded size of the reusable input until after it's been decoded...). Random with a simple cap will have to do for now.

func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) {
c.mu.Lock()
defer c.mu.Unlock()
c.clearValidTokens()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the tokens are for Cross Bundle caching, wouldn't this prevent caching from happening across different bundles, if this is called for each ProcessBundleRequest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I consider to be the best part of the implementation. The list of valid tokens is kept separate from the cache itself, so clearing that list isn't invalidating the cached ReusableInputs. On each request we get the pool of valid side input tokens from the runner, so we need that check of which side inputs we're allowed to re-use for that call; however, we still keep all of the side inputs we've seen cached up to the capacity so if they become relevant for future bundles the cache already has them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned cache ids are the valid cache tokens for that specific bundle, not across all active bundles in the worker. Wiping them out on every request means other bundles won't get cache behavior since makeAndValidateToken returns false in that case. I go into a possible solution in another comment.

Comment on lines 70 to 76
} else {
s := tok.GetSideInput()
transformID := s.GetTransformId()
sideInputID := s.GetSideInputId()
token := string(tok.GetToken())
c.setValidToken(transformID, sideInputID, token)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: idomatic go says to drop the "else" block wrapping if the if block is terminal. The rest of the loop is already getting skipped by the continue if the if clause is true, so there's no need for the else block and it's indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "", false
}
// Check if the known token is valid for this request
for _, t := range c.validTokens {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have a valid token map, instead of iterating through a slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think about it some more, having both the slice and the map of IDs to tokens seems unnecessary. Changing to only populating the map for validity would work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (although you still have to iterate over the map because the cache tokens are the values and not the keys)


// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What' invalidates a side input token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short: the runner decides what side input tokens are valid/invalid for a given bundle. In this current setup the map of IDs to tokens is disjointed from this concept of validity, it just keeps up with tokens that have been seen before. So an invalid token in this case would be a token that has been seen before but isn't valid for the current bundle. Of course, I now think that extra level may be unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the issue with the makeAndValidateToken call, is that as implemented, a newer bundle with different side inputs will invalidate cache calls for side inputs to other existing bundles. And this can flip back and forth on boundary conditions as a result. (A bundles needs set A, the next bundle needs set B, the third needs set A again, invalidating all the B cache requests...)

So this implies we should roughly keep the active set of tokens for the active bundles. Say a map[token]int. (or map[token]int8) which says how many bundles are making use of a given token. At the start of the bundles we add one to the tokens to the active set, and when the bundle ends, we remove them (subtracting from the active counts), and deleting when the count reaches 0. (this is important for iterating/eviction, and to keep the map memory smaller).

That way, the isValid call can check validity by seeing if c.validTokens[token] > 0, prioritizing deletions for tokens that are not valid as you intend, rather than simply prioritizing "the most recent bundle".

}
// Nothing is deleted if every side input is still valid, meaning that the cache size
// is likely too small.
if !deleted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's a good idea to try to avoid avoid evicting a small set of values from the cache, it would be incorrect to fail pipelines because the cache is too small. The cost of a too small a cache should be "more work" because of the additional read from the runner.

The Portable Beam model does allow for SDK side metrics that aren't attached to any bundle. SideInput cache hits, misses, and evictions, and "in use" evictions would be a great set of metrics to collect, and we can see how much this gets used for arbitrary pipelines. I recommend we collect those instead (int64s protected by the lock, possibly abstracted into an easy to copy/return by value struct for later access), and I'll work with you to get them into the monitoring responses in a later PR.

This also avoids returning errors around when things can't/shouldn't fail.

Copy link
Contributor Author

@jrmccluskey jrmccluskey Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the idea of moving to populating metrics for this instead of throwing an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a first run at it, hopefully it's what you have in mind. All of the operations that would add metrics (hit, miss, evict, failed evict) are already behind having the lock so that was pretty straightforward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics: Great!.

But also, as mentioned, we probably always want to delete something if an eviction is necessary to avoid memory problems which will just end up crashing a worker if left to grow.

It's totally OK to do it if the currently active set of tokens says everything is being used. Hence the value of the "InUseEvictionCalls" metric. We/users can see if it's a problem, and we can adjust the default/ or make it configurable for users and they can make that choice if needed.


// SideInputCache stores a cache of reusable inputs for the purposes of
// eliminating redundant calls to the runner during execution of ParDos
// using side inputs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider describing in prose how use of this maps to an SDK worker lifecyle. Not in great depth, but in particular:
initialization, a bundle request comes in, during bundle execution, bundle termination, if subsequent bundle requests come in, multiple happen simultaneously, sequentially, etc. If eviction needs to happen what's prioritized?

The goal is to allow someone to understand how it works, and possibly use it (maybe not in this case) without reading the raw code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a slightly more in-depth comment

// eliminating redundant calls to the runner during execution of ParDos
// using side inputs.
type SideInputCache struct {
cache map[string]*exec.ReusableInput
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interfaces must never be refered to by pointer, as they're already reference types. In short, drop the *.

type ReusableInput interface {

Interfaces also are fine with being multiply duplicated, so if necessary we can define our own ReusableInput interface here that's identical to the exec.ReusableInput, and break that explicit dependency, which prevents the exec package from using this. (though not the harness package, which is certainly where it starts out. Hmmm More information required for a meaningful review).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the pointer reference. The interface duplication is an interesting question

Comment on lines 42 to 43
cache map[string]exec.ReusableInput
idsToTokens map[string]string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it comes to primitive types like string or int, it's good to at least comment what they are.. eg. Cache from tokens to bar. Cache from PtransformIds to tokens etc.

It might be good to make the typing explcit. Eg. define a type token string and use that for all token instances. it means the type system will prevent accidental mis-use, and it will make the code and it's intent a bit more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// Init makes the cache map and the map of IDs to cache tokens for the
// SideInputCache. Should only be called once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add that it "returns an error for non-positive capacities".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the issue with the makeAndValidateToken call, is that as implemented, a newer bundle with different side inputs will invalidate cache calls for side inputs to other existing bundles. And this can flip back and forth on boundary conditions as a result. (A bundles needs set A, the next bundle needs set B, the third needs set A again, invalidating all the B cache requests...)

So this implies we should roughly keep the active set of tokens for the active bundles. Say a map[token]int. (or map[token]int8) which says how many bundles are making use of a given token. At the start of the bundles we add one to the tokens to the active set, and when the bundle ends, we remove them (subtracting from the active counts), and deleting when the count reaches 0. (this is important for iterating/eviction, and to keep the map memory smaller).

That way, the isValid call can check validity by seeing if c.validTokens[token] > 0, prioritizing deletions for tokens that are not valid as you intend, rather than simply prioritizing "the most recent bundle".

}
// Nothing is deleted if every side input is still valid, meaning that the cache size
// is likely too small.
if !deleted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics: Great!.

But also, as mentioned, we probably always want to delete something if an eviction is necessary to avoid memory problems which will just end up crashing a worker if left to grow.

It's totally OK to do it if the currently active set of tokens says everything is being used. Hence the value of the "InUseEvictionCalls" metric. We/users can see if it's a problem, and we can adjust the default/ or make it configurable for users and they can make that choice if needed.

// limitations under the License.

// Package statecache implements the state caching feature described by the
// Beam Fn API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skimming through that section of the doc, the recommendation is a "use a memory cost weighted LRU strategy with an upper bound as a fixed percentage of available memory." which I agree, would be great, but also complicated (since we don't know the decoded size of the reusable input until after it's been decoded...). Random with a simple cap will have to do for now.

func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) {
c.mu.Lock()
defer c.mu.Unlock()
c.clearValidTokens()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned cache ids are the valid cache tokens for that specific bundle, not across all active bundles in the worker. Wiping them out on every request means other bundles won't get cache behavior since makeAndValidateToken returns false in that case. I go into a possible solution in another comment.

@jrmccluskey
Copy link
Contributor Author

I wound up not deleting by default if a count hits zero on bundle completion, only because in a single-threaded situation where you have one bundle being processed at a time you'd be wiping the cache of everything it needs between calls. It feels better to evict only when necessary than every time a bundle ends for right now.

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits now. Overall LGTM after the changes.

I agree that the actual underlying cache shouldn't be deleted at the ends of bundles for exactly the situation you're describing. But we should delete the entry in the validTokens map because otherwise the validTokens map will grow unbounded during long running streaming jobs. It's only a a few dozen bytes at a time, but no reason to have something inscrutable to debug a year or two down the road.

Since evictions are only happening with usage (as coded), there's little risk of too much getting deleted as a result, when there are no active bundles. The next bundle, assuming it's using the same token, will cache it anyway.

func (c *SideInputCache) isValid(tok token) bool {
count, ok := c.validTokens[tok]
// If the token is not known or not in use, return false
if !ok || count <= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider return ok && count > 0 instead of an if and explicit false/true statements. The conjunction (&&) is very well defined and reads clearly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cache map[token]exec.ReusableInput
idsToTokens map[string]token
validTokens map[token]int8
mu sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conventionally, put the mutex at the top of the group of fields it's protecting (putting unprotected fields before or after them in a separate grouping).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

type SideInputCache struct {
cache map[token]exec.ReusableInput
idsToTokens map[string]token
validTokens map[token]int8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth commenting what the map is for since it's got a non-obvious signature WRT the field name. eg. // maps tokens to active bundle counts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@jrmccluskey
Copy link
Contributor Author

Also resolved clearing the validToken map entries

Comment on lines +101 to +106
count, ok := c.validTokens[tok]
if !ok {
c.validTokens[tok] = 1
} else {
c.validTokens[tok] = count + 1
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the structural parity between this method and decrementTokenCount, but do want to call something about about Go maps.

FYI Go Maps automatically return the zero value for the value type. See https://tour.golang.org/moretypes/22

In this case, it's 0. In other words, this block can be simplified a little. (but as I said, structural parity).

// cache miss.
func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReusableInput {
c.mu.Lock()
defer c.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a small concern around the QueryCache call in that at worst it means a lock call (and a serialization of cross bundle calls) for every element, but I expect the single value/in bundle cache + the benefits of this cache in general override that concern.
However, that's something we can probably figure out a way to measure, or have an "off switch" for just in case, in a later PR.

@lostluck lostluck merged commit 25c67cd into apache:master Sep 10, 2021
@jrmccluskey jrmccluskey deleted the caching branch October 1, 2021 18:01
dmitriikuzinepam pushed a commit to dmitriikuzinepam/beam that referenced this pull request Nov 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants