From 4e124874bf7c748820436f2a611df94640073569 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 8 Sep 2021 19:03:25 +0000 Subject: [PATCH 01/11] [BEAM-11097] Write first implementation of side input caching type --- .../runtime/harness/statecache/statecache.go | 172 ++++++++++ .../harness/statecache/statecache_test.go | 296 ++++++++++++++++++ 2 files changed, 468 insertions(+) create mode 100644 sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go create mode 100644 sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go new file mode 100644 index 000000000000..5c7da45d5544 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API +package statecache + +import ( + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// SideInputCache stores a cache of reusable inputs for the purposes of +// eliminating redundant calls to the runner during execution of ParDos +// using side inputs. +type SideInputCache struct { + cache map[string]*exec.ReusableInput + idsToTokens map[string]string + validTokens []string + mu sync.Mutex + capacity int +} + +// Init makes the cache map and the map of IDs to cache tokens for the +// SideInputCache. Should only be called once. +func (c *SideInputCache) Init(cap int) error { + if cap <= 0 { + return errors.Errorf("capacity must be a positive integer, got %v", cap) + } + c.mu.Lock() + defer c.mu.Unlock() + c.cache = make(map[string]*exec.ReusableInput, cap) + c.idsToTokens = make(map[string]string) + return nil +} + +// Completely clears the list of valid tokens. Should be called when +// starting to handle a new request. +func (c *SideInputCache) clearValidTokens() { + c.validTokens = nil +} + +// SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of +// transform and side input IDs to cache tokens in the process. Should be called at the start of every +// new ProcessBundleRequest. If the runner does not support caching, the passed cache token values +// should be empty and all get/set requests will silently be no-ops. +func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) { + c.mu.Lock() + defer c.mu.Unlock() + c.clearValidTokens() + for _, tok := range cacheTokens { + // User State caching is currently not supported, so these tokens are ignored + if tok.GetUserState() != nil { + continue + } else { + s := tok.GetSideInput() + transformID := s.GetTransformId() + sideInputID := s.GetSideInputId() + token := string(tok.GetToken()) + c.setValidToken(transformID, sideInputID, token) + } + } +} + +// setValidToken adds a new valid token for a request into the SideInputCache struct +// and maps the transform ID and side input ID pairing to the cache token. +func (c *SideInputCache) setValidToken(transformID, sideInputID, token string) { + idKey := transformID + sideInputID + c.idsToTokens[idKey] = token + c.validTokens = append(c.validTokens, token) +} + +func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (string, bool) { + idKey := transformID + sideInputID + // Check if it's a known token + tok, ok := c.idsToTokens[idKey] + if !ok { + return "", false + } + // Check if the known token is valid for this request + for _, t := range c.validTokens { + if t == tok { + return tok, true + } + } + return "", false +} + +// QueryCache takes a transform ID and side input ID and checking if a corresponding side +// input has been cached. A query having a bad token (e.g. one that doesn't make a known +// token or one that makes a known but currently invalid token) is treated the same as a +// cache miss. +func (c *SideInputCache) QueryCache(transformID, sideInputID string) *exec.ReusableInput { + c.mu.Lock() + defer c.mu.Unlock() + tok, ok := c.makeAndValidateToken(transformID, sideInputID) + if !ok { + return nil + } + // Check to see if cached + input, ok := c.cache[tok] + if !ok { + return nil + } + return input +} + +// SetCache allows a user to place a ReusableInput materialized from the reader into the SideInputCache +// with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token +// then we silently do not cache the input, as this is an indication that the runner is treating that input +// as uncacheable. +func (c *SideInputCache) SetCache(transformID, sideInputID string, input *exec.ReusableInput) error { + c.mu.Lock() + defer c.mu.Unlock() + tok, ok := c.makeAndValidateToken(transformID, sideInputID) + if !ok { + return nil + } + if len(c.cache) > c.capacity { + err := c.evictElement() + if err != nil { + return errors.Errorf("Cache at or above capacity, got %v", err) + } + } + c.cache[tok] = input + return nil +} + +func (c *SideInputCache) isValid(token string) bool { + for _, t := range c.validTokens { + if t == token { + return true + } + } + return false +} + +// evictElement randomly evicts a ReusableInput that is not currently valid from the cache. +// It should only be called by a goroutine that obtained the lock in SetCache. +func (c *SideInputCache) evictElement() error { + deleted := false + // Select a key from the cache at random + for k := range c.cache { + // Do not evict an element if it's currently valid + if !c.isValid(k) { + delete(c.cache, k) + deleted = true + break + } + } + // Nothing is deleted if every side input is still valid, meaning that the cache size + // is likely too small. + if !deleted { + return errors.Errorf("Failed to evict elements from cache, every element is currently valid") + } + return nil +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go new file mode 100644 index 000000000000..6d82c9817a5b --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statecache + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// TestReusableInput implements the ReusableInput interface for the purposes +// of testing. +type TestReusableInput struct { + transformID string + sideInputID string + value interface{} +} + +func makeTestReusableInput(transformID, sideInputID string, value interface{}) exec.ReusableInput { + return &TestReusableInput{transformID: transformID, sideInputID: sideInputID, value: value} +} + +// Init is a ReusableInput interface method, this is a no-op. +func (r *TestReusableInput) Init() error { + return nil +} + +// Value returns the stored value in the TestReusableInput. +func (r *TestReusableInput) Value() interface{} { + return r.value +} + +// Reset clears the value in the TestReusableInput. +func (r *TestReusableInput) Reset() error { + r.value = nil + return nil +} + +func TestInit(t *testing.T) { + var s SideInputCache + err := s.Init(5) + if err != nil { + t.Errorf("SideInputCache failed but should have succeeded, got %v", err) + } +} + +func TestInit_Bad(t *testing.T) { + var s SideInputCache + err := s.Init(0) + if err == nil { + t.Error("SideInputCache init succeeded but should have failed") + } +} + +func TestQueryCache_EmptyCase(t *testing.T) { + var s SideInputCache + err := s.Init(1) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + output := s.QueryCache("side1", "transform1") + if output != nil { + t.Errorf("Cache hit when it should have missed, got %v", output) + } +} + +func TestSetCache_UncacheableCase(t *testing.T) { + var s SideInputCache + err := s.Init(1) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + input := makeTestReusableInput("t1", "s1", 10) + err = s.SetCache("t1", "s1", &input) + if err != nil { + t.Errorf("Set cache call failed, got %v", err) + } + output := s.QueryCache("t1", "s1") + if output != nil { + t.Errorf("Cache hit when should have missed, got %v", output) + } +} + +func TestSetCache_CacheableCase(t *testing.T) { + var s SideInputCache + err := s.Init(1) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + transID := "t1" + sideID := "s1" + tok := "tok1" + s.setValidToken(transID, sideID, tok) + input := makeTestReusableInput(transID, sideID, 10) + err = s.SetCache(transID, sideID, &input) + if err != nil { + t.Fatalf("SetCache failed when should have succeeded, got %v", err) + } + output := *(s.QueryCache(transID, sideID)) + if output == nil { + t.Fatalf("call to query cache missed when should have hit") + } + val, ok := output.Value().(int) + if !ok { + t.Errorf("failed to convert value to integer, got %v", output.Value()) + } + if val != 10 { + t.Errorf("element mismatch, expected 10, got %v", val) + } +} + +func makeRequest(transformID, sideInputID, token string) fnpb.ProcessBundleRequest_CacheToken { + var tok fnpb.ProcessBundleRequest_CacheToken + var wrap fnpb.ProcessBundleRequest_CacheToken_SideInput_ + var side fnpb.ProcessBundleRequest_CacheToken_SideInput + side.TransformId = transformID + side.SideInputId = sideInputID + wrap.SideInput = &side + tok.Type = &wrap + tok.Token = []byte(token) + return tok +} + +func TestSetValidTokens(t *testing.T) { + inputs := []struct { + transformID string + sideInputID string + token string + }{ + { + "t1", + "s1", + "tok1", + }, + { + "t2", + "s2", + "tok2", + }, + { + "t3", + "s3", + "tok3", + }, + } + + var s SideInputCache + err := s.Init(3) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + + var tokens []fnpb.ProcessBundleRequest_CacheToken + for _, input := range inputs { + tok := makeRequest(input.transformID, input.sideInputID, input.token) + tokens = append(tokens, tok) + } + + s.SetValidTokens(tokens...) + if len(s.validTokens) != len(inputs) { + t.Errorf("Missing tokens, expected %v, got %v", len(inputs), len(s.validTokens)) + } + + for i, input := range inputs { + // Check that the token is in the valid list + if !s.isValid(input.token) { + t.Errorf("error in input %v, token %v is not valid", i, input.token) + } + // Check that the mapping of IDs to tokens is correct + mapped := s.idsToTokens[input.transformID+input.sideInputID] + if mapped != input.token { + t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.token, mapped) + } + } +} + +func TestSetValidTokens_ClearingBetween(t *testing.T) { + inputs := []struct { + transformID string + sideInputID string + token string + }{ + { + "t1", + "s1", + "tok1", + }, + { + "t2", + "s2", + "tok2", + }, + { + "t3", + "s3", + "tok3", + }, + } + + var s SideInputCache + err := s.Init(1) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + + for i, input := range inputs { + tok := makeRequest(input.transformID, input.sideInputID, input.token) + + s.SetValidTokens(tok) + + // Check that only one valid token is in the pool + if len(s.validTokens) != 1 { + t.Errorf("Missing token, expected 1, got %v", len(s.validTokens)) + } + // Check that the token is in the valid list + if !s.isValid(input.token) { + t.Errorf("error in input %v, token %v is not valid", i, input.token) + } + // Check that the mapping of IDs to tokens is correct + mapped := s.idsToTokens[input.transformID+input.sideInputID] + if mapped != input.token { + t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.token, mapped) + } + } +} + +func TestSetCache_Eviction(t *testing.T) { + var s SideInputCache + err := s.Init(1) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + + tokOne := makeRequest("t1", "s1", "tok1") + inOne := makeTestReusableInput("t1", "s1", 10) + s.SetValidTokens(tokOne) + err = s.SetCache("t1", "s1", &inOne) + if err != nil { + t.Errorf("setting cache failed, got %v", err) + } + + tokTwo := makeRequest("t2", "s2", "tok2") + inTwo := makeTestReusableInput("t2", "s2", 20) + s.SetValidTokens(tokTwo) + err = s.SetCache("t2", "s2", &inTwo) + if err != nil { + t.Fatalf("setting cache failed, got %v", err) + } + + if len(s.cache) != 1 { + t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache)) + } +} + +func TestSetCache_EvictionFailure(t *testing.T) { + var s SideInputCache + err := s.Init(1) + if err != nil { + t.Fatalf("cache init failed, got %v", err) + } + + tokOne := makeRequest("t1", "s1", "tok1") + inOne := makeTestReusableInput("t1", "s1", 10) + + tokTwo := makeRequest("t2", "s2", "tok2") + inTwo := makeTestReusableInput("t2", "s2", 20) + + s.SetValidTokens(tokOne, tokTwo) + err = s.SetCache("t1", "s1", &inOne) + if err != nil { + t.Errorf("setting cache failed, got %v", err) + } + // Should fail to evict because the first token is still valid + err = s.SetCache("t2", "s2", &inTwo) + if err == nil { + t.Errorf("setting cache succeeded when should have failed") + } + // Cache should not exceed size 1 + if len(s.cache) != 1 { + t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache)) + } +} From face4348081753fb472ba512a70754ec8052ae2b Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 14:47:57 +0000 Subject: [PATCH 02/11] [BEAM-11097] Address review style nits --- .../runtime/harness/statecache/statecache.go | 19 +++++++++---------- .../harness/statecache/statecache_test.go | 14 +++++++------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 5c7da45d5544..6d2bbe192bed 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -29,7 +29,7 @@ import ( // eliminating redundant calls to the runner during execution of ParDos // using side inputs. type SideInputCache struct { - cache map[string]*exec.ReusableInput + cache map[string]exec.ReusableInput idsToTokens map[string]string validTokens []string mu sync.Mutex @@ -44,7 +44,7 @@ func (c *SideInputCache) Init(cap int) error { } c.mu.Lock() defer c.mu.Unlock() - c.cache = make(map[string]*exec.ReusableInput, cap) + c.cache = make(map[string]exec.ReusableInput, cap) c.idsToTokens = make(map[string]string) return nil } @@ -67,13 +67,12 @@ func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest // User State caching is currently not supported, so these tokens are ignored if tok.GetUserState() != nil { continue - } else { - s := tok.GetSideInput() - transformID := s.GetTransformId() - sideInputID := s.GetSideInputId() - token := string(tok.GetToken()) - c.setValidToken(transformID, sideInputID, token) } + s := tok.GetSideInput() + transformID := s.GetTransformId() + sideInputID := s.GetSideInputId() + token := string(tok.GetToken()) + c.setValidToken(transformID, sideInputID, token) } } @@ -105,7 +104,7 @@ func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) ( // input has been cached. A query having a bad token (e.g. one that doesn't make a known // token or one that makes a known but currently invalid token) is treated the same as a // cache miss. -func (c *SideInputCache) QueryCache(transformID, sideInputID string) *exec.ReusableInput { +func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReusableInput { c.mu.Lock() defer c.mu.Unlock() tok, ok := c.makeAndValidateToken(transformID, sideInputID) @@ -124,7 +123,7 @@ func (c *SideInputCache) QueryCache(transformID, sideInputID string) *exec.Reusa // with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token // then we silently do not cache the input, as this is an indication that the runner is treating that input // as uncacheable. -func (c *SideInputCache) SetCache(transformID, sideInputID string, input *exec.ReusableInput) error { +func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.ReusableInput) error { c.mu.Lock() defer c.mu.Unlock() tok, ok := c.makeAndValidateToken(transformID, sideInputID) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index 6d82c9817a5b..b4c809042879 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -85,7 +85,7 @@ func TestSetCache_UncacheableCase(t *testing.T) { t.Fatalf("cache init failed, got %v", err) } input := makeTestReusableInput("t1", "s1", 10) - err = s.SetCache("t1", "s1", &input) + err = s.SetCache("t1", "s1", input) if err != nil { t.Errorf("Set cache call failed, got %v", err) } @@ -106,11 +106,11 @@ func TestSetCache_CacheableCase(t *testing.T) { tok := "tok1" s.setValidToken(transID, sideID, tok) input := makeTestReusableInput(transID, sideID, 10) - err = s.SetCache(transID, sideID, &input) + err = s.SetCache(transID, sideID, input) if err != nil { t.Fatalf("SetCache failed when should have succeeded, got %v", err) } - output := *(s.QueryCache(transID, sideID)) + output := s.QueryCache(transID, sideID) if output == nil { t.Fatalf("call to query cache missed when should have hit") } @@ -248,7 +248,7 @@ func TestSetCache_Eviction(t *testing.T) { tokOne := makeRequest("t1", "s1", "tok1") inOne := makeTestReusableInput("t1", "s1", 10) s.SetValidTokens(tokOne) - err = s.SetCache("t1", "s1", &inOne) + err = s.SetCache("t1", "s1", inOne) if err != nil { t.Errorf("setting cache failed, got %v", err) } @@ -256,7 +256,7 @@ func TestSetCache_Eviction(t *testing.T) { tokTwo := makeRequest("t2", "s2", "tok2") inTwo := makeTestReusableInput("t2", "s2", 20) s.SetValidTokens(tokTwo) - err = s.SetCache("t2", "s2", &inTwo) + err = s.SetCache("t2", "s2", inTwo) if err != nil { t.Fatalf("setting cache failed, got %v", err) } @@ -280,12 +280,12 @@ func TestSetCache_EvictionFailure(t *testing.T) { inTwo := makeTestReusableInput("t2", "s2", 20) s.SetValidTokens(tokOne, tokTwo) - err = s.SetCache("t1", "s1", &inOne) + err = s.SetCache("t1", "s1", inOne) if err != nil { t.Errorf("setting cache failed, got %v", err) } // Should fail to evict because the first token is still valid - err = s.SetCache("t2", "s2", &inTwo) + err = s.SetCache("t2", "s2", inTwo) if err == nil { t.Errorf("setting cache succeeded when should have failed") } From 72fa25c914d61e4640d6979c8b78ef6f1a00c7b1 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 15:04:10 +0000 Subject: [PATCH 03/11] [BEAM-11097] Update comments --- .../beam/core/runtime/harness/statecache/statecache.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 6d2bbe192bed..f5fec0f1a2f1 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -15,6 +15,8 @@ // Package statecache implements the state caching feature described by the // Beam Fn API +// +// https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m package statecache import ( @@ -28,6 +30,14 @@ import ( // SideInputCache stores a cache of reusable inputs for the purposes of // eliminating redundant calls to the runner during execution of ParDos // using side inputs. +// +// A SideInputCache should be initialized when the SDK harness is initialized, +// creating storage for side input caching. On each ProcessBundleRequest, +// the cache will process the list of tokens for cacheable side inputs and +// be queried when side inputs are requested in bundle execution. Once a +// new bundle request comes in the valid tokens will be updated and the cache +// will be re-used. In the event that the cache reaches capacity, a random, +// currently invalid cached object will be evicted. type SideInputCache struct { cache map[string]exec.ReusableInput idsToTokens map[string]string From c93912dafed8bf29ccf373f852ffe4107963906d Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 15:39:12 +0000 Subject: [PATCH 04/11] [BEAM-11097] Refactor away from valid token slice, fix capacity bug --- .../runtime/harness/statecache/statecache.go | 23 +++++++------------ .../harness/statecache/statecache_test.go | 8 +++---- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index f5fec0f1a2f1..598d5871b70f 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -41,7 +41,6 @@ import ( type SideInputCache struct { cache map[string]exec.ReusableInput idsToTokens map[string]string - validTokens []string mu sync.Mutex capacity int } @@ -56,13 +55,14 @@ func (c *SideInputCache) Init(cap int) error { defer c.mu.Unlock() c.cache = make(map[string]exec.ReusableInput, cap) c.idsToTokens = make(map[string]string) + c.capacity = cap return nil } -// Completely clears the list of valid tokens. Should be called when +// Completely clears the map of valid tokens. Should be called when // starting to handle a new request. func (c *SideInputCache) clearValidTokens() { - c.validTokens = nil + c.idsToTokens = make(map[string]string) } // SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of @@ -87,27 +87,20 @@ func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest } // setValidToken adds a new valid token for a request into the SideInputCache struct -// and maps the transform ID and side input ID pairing to the cache token. +// by mapping the transform ID and side input ID pairing to the cache token. func (c *SideInputCache) setValidToken(transformID, sideInputID, token string) { idKey := transformID + sideInputID c.idsToTokens[idKey] = token - c.validTokens = append(c.validTokens, token) } func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (string, bool) { idKey := transformID + sideInputID - // Check if it's a known token + // Check if it's a known, valid token tok, ok := c.idsToTokens[idKey] if !ok { return "", false } - // Check if the known token is valid for this request - for _, t := range c.validTokens { - if t == tok { - return tok, true - } - } - return "", false + return tok, true } // QueryCache takes a transform ID and side input ID and checking if a corresponding side @@ -140,7 +133,7 @@ func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.Re if !ok { return nil } - if len(c.cache) > c.capacity { + if len(c.cache) >= c.capacity { err := c.evictElement() if err != nil { return errors.Errorf("Cache at or above capacity, got %v", err) @@ -151,7 +144,7 @@ func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.Re } func (c *SideInputCache) isValid(token string) bool { - for _, t := range c.validTokens { + for _, t := range c.idsToTokens { if t == token { return true } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index b4c809042879..138e20d5c24f 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -171,8 +171,8 @@ func TestSetValidTokens(t *testing.T) { } s.SetValidTokens(tokens...) - if len(s.validTokens) != len(inputs) { - t.Errorf("Missing tokens, expected %v, got %v", len(inputs), len(s.validTokens)) + if len(s.idsToTokens) != len(inputs) { + t.Errorf("Missing tokens, expected %v, got %v", len(inputs), len(s.idsToTokens)) } for i, input := range inputs { @@ -223,8 +223,8 @@ func TestSetValidTokens_ClearingBetween(t *testing.T) { s.SetValidTokens(tok) // Check that only one valid token is in the pool - if len(s.validTokens) != 1 { - t.Errorf("Missing token, expected 1, got %v", len(s.validTokens)) + if len(s.idsToTokens) != 1 { + t.Errorf("Missing token, expected 1, got %v", len(s.idsToTokens)) } // Check that the token is in the valid list if !s.isValid(input.token) { From f23b8b85474b3fbe6ab6b07f9f3884b1a294bf3a Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 16:03:21 +0000 Subject: [PATCH 05/11] [BEAM-11097] Add first run at metrics collection --- .../runtime/harness/statecache/statecache.go | 32 +++++++++++------ .../harness/statecache/statecache_test.go | 36 +++++++------------ 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 598d5871b70f..475b92515f42 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -43,6 +43,14 @@ type SideInputCache struct { idsToTokens map[string]string mu sync.Mutex capacity int + metrics CacheMetrics +} + +type CacheMetrics struct { + Hits int64 + Misses int64 + Evictions int64 + InUseEvictionCalls int64 } // Init makes the cache map and the map of IDs to cache tokens for the @@ -117,8 +125,11 @@ func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.Reusab // Check to see if cached input, ok := c.cache[tok] if !ok { + c.metrics.Misses++ return nil } + + c.metrics.Hits++ return input } @@ -126,21 +137,20 @@ func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.Reusab // with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token // then we silently do not cache the input, as this is an indication that the runner is treating that input // as uncacheable. -func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.ReusableInput) error { +func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.ReusableInput) { c.mu.Lock() defer c.mu.Unlock() tok, ok := c.makeAndValidateToken(transformID, sideInputID) if !ok { - return nil + return } if len(c.cache) >= c.capacity { - err := c.evictElement() - if err != nil { - return errors.Errorf("Cache at or above capacity, got %v", err) + evicted := c.evictElement() + if !evicted { + return } } c.cache[tok] = input - return nil } func (c *SideInputCache) isValid(token string) bool { @@ -154,21 +164,21 @@ func (c *SideInputCache) isValid(token string) bool { // evictElement randomly evicts a ReusableInput that is not currently valid from the cache. // It should only be called by a goroutine that obtained the lock in SetCache. -func (c *SideInputCache) evictElement() error { +func (c *SideInputCache) evictElement() bool { deleted := false // Select a key from the cache at random for k := range c.cache { // Do not evict an element if it's currently valid if !c.isValid(k) { delete(c.cache, k) + c.metrics.Evictions++ deleted = true break } } - // Nothing is deleted if every side input is still valid, meaning that the cache size - // is likely too small. + // Nothing is deleted if every side input is still valid. if !deleted { - return errors.Errorf("Failed to evict elements from cache, every element is currently valid") + c.metrics.InUseEvictionCalls++ } - return nil + return deleted } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index 138e20d5c24f..2c0ff32bfa76 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -85,10 +85,7 @@ func TestSetCache_UncacheableCase(t *testing.T) { t.Fatalf("cache init failed, got %v", err) } input := makeTestReusableInput("t1", "s1", 10) - err = s.SetCache("t1", "s1", input) - if err != nil { - t.Errorf("Set cache call failed, got %v", err) - } + s.SetCache("t1", "s1", input) output := s.QueryCache("t1", "s1") if output != nil { t.Errorf("Cache hit when should have missed, got %v", output) @@ -106,10 +103,7 @@ func TestSetCache_CacheableCase(t *testing.T) { tok := "tok1" s.setValidToken(transID, sideID, tok) input := makeTestReusableInput(transID, sideID, 10) - err = s.SetCache(transID, sideID, input) - if err != nil { - t.Fatalf("SetCache failed when should have succeeded, got %v", err) - } + s.SetCache(transID, sideID, input) output := s.QueryCache(transID, sideID) if output == nil { t.Fatalf("call to query cache missed when should have hit") @@ -248,22 +242,19 @@ func TestSetCache_Eviction(t *testing.T) { tokOne := makeRequest("t1", "s1", "tok1") inOne := makeTestReusableInput("t1", "s1", 10) s.SetValidTokens(tokOne) - err = s.SetCache("t1", "s1", inOne) - if err != nil { - t.Errorf("setting cache failed, got %v", err) - } + s.SetCache("t1", "s1", inOne) tokTwo := makeRequest("t2", "s2", "tok2") inTwo := makeTestReusableInput("t2", "s2", 20) s.SetValidTokens(tokTwo) - err = s.SetCache("t2", "s2", inTwo) - if err != nil { - t.Fatalf("setting cache failed, got %v", err) - } + s.SetCache("t2", "s2", inTwo) if len(s.cache) != 1 { t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache)) } + if s.metrics.Evictions != 1 { + t.Errorf("number evictions incorrect, expected 1, got %v", s.metrics.Evictions) + } } func TestSetCache_EvictionFailure(t *testing.T) { @@ -280,17 +271,14 @@ func TestSetCache_EvictionFailure(t *testing.T) { inTwo := makeTestReusableInput("t2", "s2", 20) s.SetValidTokens(tokOne, tokTwo) - err = s.SetCache("t1", "s1", inOne) - if err != nil { - t.Errorf("setting cache failed, got %v", err) - } + s.SetCache("t1", "s1", inOne) // Should fail to evict because the first token is still valid - err = s.SetCache("t2", "s2", inTwo) - if err == nil { - t.Errorf("setting cache succeeded when should have failed") - } + s.SetCache("t2", "s2", inTwo) // Cache should not exceed size 1 if len(s.cache) != 1 { t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache)) } + if s.metrics.InUseEvictionCalls != 1 { + t.Errorf("number of failed evicition calls incorrect, expected 1, got %v", s.metrics.InUseEvictionCalls) + } } From 32bd5be1139906d80933ad0aa8cb8f13a45a201d Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 17:38:58 +0000 Subject: [PATCH 06/11] [BEAM-11097] Add context to doc link --- sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 475b92515f42..2823c65eab90 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -16,6 +16,7 @@ // Package statecache implements the state caching feature described by the // Beam Fn API // +// The Beam State API and the intended caching behavior are described here: // https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m package statecache From cab82d44a0b24c6aefe97574ed61ff2fe2706c27 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 17:58:45 +0000 Subject: [PATCH 07/11] [BEAM-11097] Add comment to Init() --- sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 2823c65eab90..bfadedcd5609 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -55,7 +55,8 @@ type CacheMetrics struct { } // Init makes the cache map and the map of IDs to cache tokens for the -// SideInputCache. Should only be called once. +// SideInputCache. Should only be called once. Returns an error for +// non-positive capacities. func (c *SideInputCache) Init(cap int) error { if cap <= 0 { return errors.Errorf("capacity must be a positive integer, got %v", cap) From bc14e31af89ea26454dd0899c528957e440d583e Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 18:04:56 +0000 Subject: [PATCH 08/11] [BEAM-11097] Update metrics and in-use caching behavior --- .../runtime/harness/statecache/statecache.go | 25 ++++++++++--------- .../harness/statecache/statecache_test.go | 4 +-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index bfadedcd5609..81ebac5d9afb 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -48,10 +48,10 @@ type SideInputCache struct { } type CacheMetrics struct { - Hits int64 - Misses int64 - Evictions int64 - InUseEvictionCalls int64 + Hits int64 + Misses int64 + Evictions int64 + InUseEvictions int64 } // Init makes the cache map and the map of IDs to cache tokens for the @@ -147,10 +147,7 @@ func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.Re return } if len(c.cache) >= c.capacity { - evicted := c.evictElement() - if !evicted { - return - } + c.evictElement() } c.cache[tok] = input } @@ -166,7 +163,7 @@ func (c *SideInputCache) isValid(token string) bool { // evictElement randomly evicts a ReusableInput that is not currently valid from the cache. // It should only be called by a goroutine that obtained the lock in SetCache. -func (c *SideInputCache) evictElement() bool { +func (c *SideInputCache) evictElement() { deleted := false // Select a key from the cache at random for k := range c.cache { @@ -178,9 +175,13 @@ func (c *SideInputCache) evictElement() bool { break } } - // Nothing is deleted if every side input is still valid. + // Nothing is deleted if every side input is still valid. Clear + // out a random entry and record the in-use eviction if !deleted { - c.metrics.InUseEvictionCalls++ + for k := range c.cache { + delete(c.cache, k) + c.metrics.InUseEvictions++ + break + } } - return deleted } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index 2c0ff32bfa76..a700ed6a8f2d 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -278,7 +278,7 @@ func TestSetCache_EvictionFailure(t *testing.T) { if len(s.cache) != 1 { t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache)) } - if s.metrics.InUseEvictionCalls != 1 { - t.Errorf("number of failed evicition calls incorrect, expected 1, got %v", s.metrics.InUseEvictionCalls) + if s.metrics.InUseEvictions != 1 { + t.Errorf("number of failed evicition calls incorrect, expected 1, got %v", s.metrics.InUseEvictions) } } From 0af0e2fd73e5a858706f3ffb1e20b457798a7494 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 18:15:28 +0000 Subject: [PATCH 09/11] [BEAM-11097] Add token type for clarity --- .../runtime/harness/statecache/statecache.go | 26 ++++++++------- .../harness/statecache/statecache_test.go | 32 +++++++++---------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 81ebac5d9afb..44fecac22b3e 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -28,6 +28,8 @@ import ( fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" ) +type token string + // SideInputCache stores a cache of reusable inputs for the purposes of // eliminating redundant calls to the runner during execution of ParDos // using side inputs. @@ -40,8 +42,8 @@ import ( // will be re-used. In the event that the cache reaches capacity, a random, // currently invalid cached object will be evicted. type SideInputCache struct { - cache map[string]exec.ReusableInput - idsToTokens map[string]string + cache map[token]exec.ReusableInput + idsToTokens map[string]token mu sync.Mutex capacity int metrics CacheMetrics @@ -63,8 +65,8 @@ func (c *SideInputCache) Init(cap int) error { } c.mu.Lock() defer c.mu.Unlock() - c.cache = make(map[string]exec.ReusableInput, cap) - c.idsToTokens = make(map[string]string) + c.cache = make(map[token]exec.ReusableInput, cap) + c.idsToTokens = make(map[string]token) c.capacity = cap return nil } @@ -72,7 +74,7 @@ func (c *SideInputCache) Init(cap int) error { // Completely clears the map of valid tokens. Should be called when // starting to handle a new request. func (c *SideInputCache) clearValidTokens() { - c.idsToTokens = make(map[string]string) + c.idsToTokens = make(map[string]token) } // SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of @@ -91,19 +93,19 @@ func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest s := tok.GetSideInput() transformID := s.GetTransformId() sideInputID := s.GetSideInputId() - token := string(tok.GetToken()) - c.setValidToken(transformID, sideInputID, token) + t := token(tok.GetToken()) + c.setValidToken(transformID, sideInputID, t) } } // setValidToken adds a new valid token for a request into the SideInputCache struct // by mapping the transform ID and side input ID pairing to the cache token. -func (c *SideInputCache) setValidToken(transformID, sideInputID, token string) { +func (c *SideInputCache) setValidToken(transformID, sideInputID string, tok token) { idKey := transformID + sideInputID - c.idsToTokens[idKey] = token + c.idsToTokens[idKey] = tok } -func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (string, bool) { +func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (token, bool) { idKey := transformID + sideInputID // Check if it's a known, valid token tok, ok := c.idsToTokens[idKey] @@ -152,9 +154,9 @@ func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.Re c.cache[tok] = input } -func (c *SideInputCache) isValid(token string) bool { +func (c *SideInputCache) isValid(tok token) bool { for _, t := range c.idsToTokens { - if t == token { + if t == tok { return true } } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index a700ed6a8f2d..c8442f36fa5a 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -100,7 +100,7 @@ func TestSetCache_CacheableCase(t *testing.T) { } transID := "t1" sideID := "s1" - tok := "tok1" + tok := token("tok1") s.setValidToken(transID, sideID, tok) input := makeTestReusableInput(transID, sideID, 10) s.SetCache(transID, sideID, input) @@ -117,7 +117,7 @@ func TestSetCache_CacheableCase(t *testing.T) { } } -func makeRequest(transformID, sideInputID, token string) fnpb.ProcessBundleRequest_CacheToken { +func makeRequest(transformID, sideInputID string, t token) fnpb.ProcessBundleRequest_CacheToken { var tok fnpb.ProcessBundleRequest_CacheToken var wrap fnpb.ProcessBundleRequest_CacheToken_SideInput_ var side fnpb.ProcessBundleRequest_CacheToken_SideInput @@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID, token string) fnpb.ProcessBundleReque side.SideInputId = sideInputID wrap.SideInput = &side tok.Type = &wrap - tok.Token = []byte(token) + tok.Token = []byte(t) return tok } @@ -133,7 +133,7 @@ func TestSetValidTokens(t *testing.T) { inputs := []struct { transformID string sideInputID string - token string + tok token }{ { "t1", @@ -160,8 +160,8 @@ func TestSetValidTokens(t *testing.T) { var tokens []fnpb.ProcessBundleRequest_CacheToken for _, input := range inputs { - tok := makeRequest(input.transformID, input.sideInputID, input.token) - tokens = append(tokens, tok) + t := makeRequest(input.transformID, input.sideInputID, input.tok) + tokens = append(tokens, t) } s.SetValidTokens(tokens...) @@ -171,13 +171,13 @@ func TestSetValidTokens(t *testing.T) { for i, input := range inputs { // Check that the token is in the valid list - if !s.isValid(input.token) { - t.Errorf("error in input %v, token %v is not valid", i, input.token) + if !s.isValid(input.tok) { + t.Errorf("error in input %v, token %v is not valid", i, input.tok) } // Check that the mapping of IDs to tokens is correct mapped := s.idsToTokens[input.transformID+input.sideInputID] - if mapped != input.token { - t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.token, mapped) + if mapped != input.tok { + t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.tok, mapped) } } } @@ -186,7 +186,7 @@ func TestSetValidTokens_ClearingBetween(t *testing.T) { inputs := []struct { transformID string sideInputID string - token string + tk token }{ { "t1", @@ -212,7 +212,7 @@ func TestSetValidTokens_ClearingBetween(t *testing.T) { } for i, input := range inputs { - tok := makeRequest(input.transformID, input.sideInputID, input.token) + tok := makeRequest(input.transformID, input.sideInputID, input.tk) s.SetValidTokens(tok) @@ -221,13 +221,13 @@ func TestSetValidTokens_ClearingBetween(t *testing.T) { t.Errorf("Missing token, expected 1, got %v", len(s.idsToTokens)) } // Check that the token is in the valid list - if !s.isValid(input.token) { - t.Errorf("error in input %v, token %v is not valid", i, input.token) + if !s.isValid(input.tk) { + t.Errorf("error in input %v, token %v is not valid", i, input.tk) } // Check that the mapping of IDs to tokens is correct mapped := s.idsToTokens[input.transformID+input.sideInputID] - if mapped != input.token { - t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.token, mapped) + if mapped != input.tk { + t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.tk, mapped) } } } From 3160d4e80b6b1b517250604cbd1bf950f05cd5d2 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 18:39:44 +0000 Subject: [PATCH 10/11] [BEAM-11097] Switch to count-based eviction system --- .../runtime/harness/statecache/statecache.go | 53 ++++++++++++++----- .../harness/statecache/statecache_test.go | 14 +++-- 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 44fecac22b3e..4d66d3242817 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -44,6 +44,7 @@ type token string type SideInputCache struct { cache map[token]exec.ReusableInput idsToTokens map[string]token + validTokens map[token]int8 mu sync.Mutex capacity int metrics CacheMetrics @@ -67,16 +68,11 @@ func (c *SideInputCache) Init(cap int) error { defer c.mu.Unlock() c.cache = make(map[token]exec.ReusableInput, cap) c.idsToTokens = make(map[string]token) + c.validTokens = make(map[token]int8) c.capacity = cap return nil } -// Completely clears the map of valid tokens. Should be called when -// starting to handle a new request. -func (c *SideInputCache) clearValidTokens() { - c.idsToTokens = make(map[string]token) -} - // SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of // transform and side input IDs to cache tokens in the process. Should be called at the start of every // new ProcessBundleRequest. If the runner does not support caching, the passed cache token values @@ -84,7 +80,6 @@ func (c *SideInputCache) clearValidTokens() { func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) { c.mu.Lock() defer c.mu.Unlock() - c.clearValidTokens() for _, tok := range cacheTokens { // User State caching is currently not supported, so these tokens are ignored if tok.GetUserState() != nil { @@ -103,16 +98,46 @@ func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest func (c *SideInputCache) setValidToken(transformID, sideInputID string, tok token) { idKey := transformID + sideInputID c.idsToTokens[idKey] = tok + count, ok := c.validTokens[tok] + if !ok { + c.validTokens[tok] = 1 + } else { + c.validTokens[tok] = count + 1 + } +} + +// CompleteBundle takes the cache tokens passed to set the valid tokens and decrements their +// usage count for the purposes of maintaining a valid count of whether or not a value is +// still in use. Should be called once ProcessBundle has completed. +func (c *SideInputCache) CompleteBundle(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) { + c.mu.Lock() + defer c.mu.Unlock() + for _, tok := range cacheTokens { + // User State caching is currently not supported, so these tokens are ignored + if tok.GetUserState() != nil { + continue + } + t := token(tok.GetToken()) + c.decrementTokenCount(t) + } +} + +// decrementTokenCount decrements the validTokens entry for +// a given token by 1. Should only be called when completing +// a bundle. +func (c *SideInputCache) decrementTokenCount(tok token) { + count := c.validTokens[tok] + c.validTokens[tok] = count - 1 } func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (token, bool) { idKey := transformID + sideInputID - // Check if it's a known, valid token + // Check if it's a known token tok, ok := c.idsToTokens[idKey] if !ok { return "", false } - return tok, true + return tok, c.isValid(tok) } // QueryCache takes a transform ID and side input ID and checking if a corresponding side @@ -155,12 +180,12 @@ func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.Re } func (c *SideInputCache) isValid(tok token) bool { - for _, t := range c.idsToTokens { - if t == tok { - return true - } + count, ok := c.validTokens[tok] + // If the token is not known or not in use, return false + if !ok || count <= 0 { + return false } - return false + return true } // evictElement randomly evicts a ReusableInput that is not currently valid from the cache. diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go index c8442f36fa5a..b9970c398154 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go @@ -216,10 +216,6 @@ func TestSetValidTokens_ClearingBetween(t *testing.T) { s.SetValidTokens(tok) - // Check that only one valid token is in the pool - if len(s.idsToTokens) != 1 { - t.Errorf("Missing token, expected 1, got %v", len(s.idsToTokens)) - } // Check that the token is in the valid list if !s.isValid(input.tk) { t.Errorf("error in input %v, token %v is not valid", i, input.tk) @@ -229,6 +225,14 @@ func TestSetValidTokens_ClearingBetween(t *testing.T) { if mapped != input.tk { t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.tk, mapped) } + + s.CompleteBundle(tok) + } + + for k, _ := range s.validTokens { + if s.validTokens[k] != 0 { + t.Errorf("token count mismatch for token %v, expected 0, got %v", k, s.validTokens[k]) + } } } @@ -243,6 +247,8 @@ func TestSetCache_Eviction(t *testing.T) { inOne := makeTestReusableInput("t1", "s1", 10) s.SetValidTokens(tokOne) s.SetCache("t1", "s1", inOne) + // Mark bundle as complete, drop count for tokOne to 0 + s.CompleteBundle(tokOne) tokTwo := makeRequest("t2", "s2", "tok2") inTwo := makeTestReusableInput("t2", "s2", 20) From 9c6a55881cb54cb05f4706ea8a128a622baa5cf0 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 10 Sep 2021 21:58:05 +0000 Subject: [PATCH 11/11] [BEAM-11097] Fix nits, address validToken clearing --- .../runtime/harness/statecache/statecache.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 4d66d3242817..5496d8b81252 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -42,11 +42,11 @@ type token string // will be re-used. In the event that the cache reaches capacity, a random, // currently invalid cached object will be evicted. type SideInputCache struct { + capacity int + mu sync.Mutex cache map[token]exec.ReusableInput idsToTokens map[string]token - validTokens map[token]int8 - mu sync.Mutex - capacity int + validTokens map[token]int8 // Maps tokens to active bundle counts metrics CacheMetrics } @@ -127,7 +127,11 @@ func (c *SideInputCache) CompleteBundle(cacheTokens ...fnpb.ProcessBundleRequest // a bundle. func (c *SideInputCache) decrementTokenCount(tok token) { count := c.validTokens[tok] - c.validTokens[tok] = count - 1 + if count == 1 { + delete(c.validTokens, tok) + } else { + c.validTokens[tok] = count - 1 + } } func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (token, bool) { @@ -182,10 +186,7 @@ func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.Re func (c *SideInputCache) isValid(tok token) bool { count, ok := c.validTokens[tok] // If the token is not known or not in use, return false - if !ok || count <= 0 { - return false - } - return true + return ok && count > 0 } // evictElement randomly evicts a ReusableInput that is not currently valid from the cache.