diff --git a/cache/remotecache/v1/cachestorage.go b/cache/remotecache/v1/cachestorage.go index 6cd5e941a5da..1a1530259229 100644 --- a/cache/remotecache/v1/cachestorage.go +++ b/cache/remotecache/v1/cachestorage.go @@ -5,7 +5,6 @@ import ( "slices" "time" - "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" @@ -21,8 +20,11 @@ func NewCacheKeyStorage(cc *CacheChains, w worker.Worker) (solver.CacheKeyStorag byResult: map[string]map[string]struct{}{}, } - for _, it := range cc.items { - if _, err := addItemToStorage(storage, it); err != nil { + cc.computeIDs() + + for it := range cc.leaves() { + visited := make(map[*item]*itemWithOutgoingLinks) + if _, err := addItemToStorage(storage, it, visited); err != nil { return nil, nil, err } } @@ -37,7 +39,12 @@ func NewCacheKeyStorage(cc *CacheChains, w worker.Worker) (solver.CacheKeyStorag return storage, results, nil } -func addItemToStorage(k *cacheKeyStorage, it *item) (*itemWithOutgoingLinks, error) { +func addItemToStorage(k *cacheKeyStorage, it *item, visited map[*item]*itemWithOutgoingLinks) (*itemWithOutgoingLinks, error) { + if v, ok := visited[it]; ok { + return v, nil + } + visited[it] = nil + if id, ok := k.byItem[it]; ok { if id == "" { return nil, errors.Errorf("invalid loop") @@ -45,21 +52,18 @@ func addItemToStorage(k *cacheKeyStorage, it *item) (*itemWithOutgoingLinks, err return k.byID[id], nil } - var id string - if len(it.links) == 0 { - id = it.dgst.String() - } else { - id = identity.NewID() - } - + id := it.id k.byItem[it] = "" - for i, m := range it.links { + for i, m := range it.parents { for l := range m { - src, err := addItemToStorage(k, l.src) + src, err := addItemToStorage(k, l.src, visited) if err != nil { return nil, err } + if src == nil { + continue + } cl := nlink{ input: i, dgst: it.dgst, @@ -78,8 +82,13 @@ func addItemToStorage(k *cacheKeyStorage, it *item) (*itemWithOutgoingLinks, err k.byID[id] = itl - if res := it.result; res != nil { - resultID := remoteID(res) + seen := map[string]struct{}{} + for _, res := range it.results { + resultID := remoteID(res.Result) + if _, ok := seen[resultID]; ok { + continue + } + seen[resultID] = struct{}{} ids, ok := k.byResult[resultID] if !ok { ids = map[string]struct{}{} @@ -87,6 +96,7 @@ func addItemToStorage(k *cacheKeyStorage, it *item) (*itemWithOutgoingLinks, err } ids[id] = struct{}{} } + visited[it] = itl return itl, nil } @@ -120,21 +130,32 @@ func (cs *cacheKeyStorage) WalkResults(id string, fn func(solver.CacheResult) er if !ok { return nil } - if res := it.result; res != nil { - return fn(solver.CacheResult{ID: remoteID(res), CreatedAt: it.resultTime}) + seen := map[string]struct{}{} + for _, res := range it.results { + id := remoteID(res.Result) + if _, ok := seen[id]; ok { + continue + } + if err := fn(solver.CacheResult{ID: id, CreatedAt: res.CreatedAt}); err != nil { + return err + } + seen[id] = struct{}{} } return nil } func (cs *cacheKeyStorage) Load(id string, resultID string) (solver.CacheResult, error) { - it, ok := cs.byID[id] - if !ok { - return solver.CacheResult{}, nil - } - if res := it.result; res != nil { - return solver.CacheResult{ID: remoteID(res), CreatedAt: it.resultTime}, nil + var res solver.CacheResult + if err := cs.WalkResults(id, func(r solver.CacheResult) error { + if r.ID == resultID { + res = r + return nil + } + return nil + }); err != nil { + return solver.CacheResult{}, errors.Wrapf(err, "failed to load cache result for %s", id) } - return solver.CacheResult{}, nil + return res, nil } func (cs *cacheKeyStorage) AddResult(id string, res solver.CacheResult) error { @@ -250,28 +271,35 @@ func (cs *cacheResultStorage) LoadWithParents(ctx context.Context, res solver.Ca for id := range ids { v, ok := cs.byID[id] - if ok && v.result != nil { - if err := v.walkAllResults(func(i *item) error { - if i.result == nil { - return nil - } - id, ok := cs.byItem[i] - if !ok { - return nil - } - if isSubRemote(*i.result, *v.result) { - ref, err := cs.w.FromRemote(ctx, i.result) - if err != nil { - return err + if ok { + if _, ok := visited[v.item]; ok { + continue + } + for _, result := range v.results { + resultID := remoteID(result.Result) + if resultID == res.ID { + if err := v.walkAllResults(func(i *item) error { + for _, subRes := range i.results { + id, ok := cs.byItem[i] + if !ok { + return nil + } + if isSubRemote(*subRes.Result, *result.Result) { + ref, err := cs.w.FromRemote(ctx, subRes.Result) + if err != nil { + return err + } + m[id] = worker.NewWorkerRefResult(ref, cs.w) + } + } + return nil + }, visited); err != nil { + for _, v := range m { + v.Release(context.TODO()) + } + return nil, err } - m[id] = worker.NewWorkerRefResult(ref, cs.w) } - return nil - }, visited); err != nil { - for _, v := range m { - v.Release(context.TODO()) - } - return nil, err } } } @@ -281,34 +309,43 @@ func (cs *cacheResultStorage) LoadWithParents(ctx context.Context, res solver.Ca func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult) (solver.Result, error) { item := cs.byResultID(res.ID) - if item == nil || item.result == nil { - return nil, errors.WithStack(solver.ErrNotFound) - } - - ref, err := cs.w.FromRemote(ctx, item.result) - if err != nil { - return nil, errors.Wrap(err, "failed to load result from remote") + for _, r := range item.results { + resultID := remoteID(r.Result) + if resultID != res.ID { + continue + } + ref, err := cs.w.FromRemote(ctx, r.Result) + if err != nil { + return nil, errors.Wrap(err, "failed to load result from remote") + } + return worker.NewWorkerRefResult(ref, cs.w), nil } - return worker.NewWorkerRefResult(ref, cs.w), nil + return nil, errors.WithStack(solver.ErrNotFound) } func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopts *compression.Config, _ session.Group) ([]*solver.Remote, error) { - if r := cs.byResultID(res.ID); r != nil && r.result != nil { - if compressionopts == nil { - return []*solver.Remote{r.result}, nil - } - // Any of blobs in the remote must meet the specified compression option. - match := false - for _, desc := range r.result.Descriptors { - m := compression.IsMediaType(compressionopts.Type, desc.MediaType) - match = match || m - if compressionopts.Force && !m { - match = false - break + if it := cs.byResultID(res.ID); it != nil { + for _, r := range it.results { + if compressionopts == nil { + resultID := remoteID(r.Result) + if resultID != res.ID { + continue + } + return []*solver.Remote{r.Result}, nil + } + // Any of blobs in the remote must meet the specified compression option. + match := false + for _, desc := range r.Result.Descriptors { + m := compression.IsMediaType(compressionopts.Type, desc.MediaType) + match = match || m + if compressionopts.Force && !m { + match = false + break + } + } + if match { + return []*solver.Remote{r.Result}, nil } - } - if match { - return []*solver.Remote{r.result}, nil } return nil, nil // return nil as it's best effort. } diff --git a/cache/remotecache/v1/chains.go b/cache/remotecache/v1/chains.go index ca318ba22606..7f80399e53d2 100644 --- a/cache/remotecache/v1/chains.go +++ b/cache/remotecache/v1/chains.go @@ -2,10 +2,13 @@ package cacheimport import ( "context" + "encoding/binary" + "maps" + "slices" "strings" - "sync" - "time" + "unique" + "github.com/cespare/xxhash/v2" "github.com/containerd/containerd/v2/core/content" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" @@ -15,72 +18,187 @@ import ( ) func NewCacheChains() *CacheChains { - return &CacheChains{visited: map[any]struct{}{}} + return &CacheChains{roots: map[digest.Digest]*item{}} } type CacheChains struct { - items []*item - visited map[any]struct{} + roots map[digest.Digest]*item } var _ solver.CacheExporterTarget = &CacheChains{} -func (c *CacheChains) Add(dgst digest.Digest) solver.CacheExporterRecord { - if strings.HasPrefix(dgst.String(), "random:") { - // random digests will be different *every* run - so we shouldn't cache - // it, since there's a zero chance this random digest collides again - return &nopRecord{} +func (c *CacheChains) computeIDs() { + for it := range c.leaves() { + it.computeID() } - - it := &item{dgst: dgst, backlinks: map[*item]struct{}{}} - c.items = append(c.items, it) - return it } -func (c *CacheChains) Visit(target any) { - c.visited[target] = struct{}{} +func (c *CacheChains) leaves() map[*item]struct{} { + leafs := map[*item]struct{}{} + visited := map[*item]struct{}{} + for _, it := range c.roots { + it.walkChildren(func(i *item) error { + if len(i.children) == 0 { + leafs[i] = struct{}{} + } + return nil + }, visited) + } + return leafs } -func (c *CacheChains) Visited(target any) bool { - _, ok := c.visited[target] - return ok -} +func (c *CacheChains) Add(dgst digest.Digest, deps [][]solver.CacheLink, results []solver.CacheExportResult) (solver.CacheExporterRecord, bool, error) { + if strings.HasPrefix(dgst.String(), "random:") { + return nil, false, nil + } + r := &item{ + dgst: dgst, + results: results, + cc: c, + } -func (c *CacheChains) normalize(ctx context.Context) error { - st := &normalizeState{ - added: map[*item]*item{}, - links: map[*item]map[nlink]map[digest.Digest]struct{}{}, - byKey: map[digest.Digest]*item{}, + if len(deps) == 0 { + if prev, ok := c.roots[dgst]; ok { + r = prev + } + for _, rr := range results { + r.addResult(rr) + } + c.roots[dgst] = r + return r, true, nil } - validated := make([]*item, 0, len(c.items)) - for _, it := range c.items { - it.backlinksMu.Lock() - it.validate() - it.backlinksMu.Unlock() + matchDeps := make([]func() map[*item]struct{}, len(deps)) + for i, dd := range deps { + if len(dd) == 0 { + return nil, false, errors.Errorf("empty dependency for %s", dgst) + } + type itemWithSelector struct { + Src *item + Selector string + } + items := make([]itemWithSelector, len(dd)) + for ii, d := range dd { + it, ok := d.Src.(*item) + if !ok { + return nil, false, errors.Errorf("invalid dependency type %T for %s", d.Src, dgst) + } + if it.cc != c { + return nil, false, errors.Errorf("dependency %s is not part of the same cache chain", it.dgst) + } + items[ii] = itemWithSelector{ + Src: it, + Selector: d.Selector, + } + } + matchDeps[i] = func() map[*item]struct{} { + candidates := map[*item]struct{}{} + for _, it := range items { + maps.Copy(candidates, it.Src.children[unique.Make(linkv2{ + selector: it.Selector, + index: i, + digest: dgst, + })]) + } + return candidates + } } - for _, it := range c.items { - if !it.invalid { - validated = append(validated, it) + items := IntersectAll(matchDeps) + + if len(items) > 1 { + var main *item + for it := range items { + main = it + break } + for it := range items { + if it == main { + continue + } + for l, m := range it.children { + if main.children == nil { + main.children = map[unique.Handle[linkv2]]map[*item]struct{}{} + } + if _, ok := main.children[l]; !ok { + main.children[l] = map[*item]struct{}{} + } + for ch := range m { + main.children[l][ch] = struct{}{} + for i, links := range ch.parents { + newlinks := map[link]struct{}{} + for l := range links { + if l.src == it { + l.src = main + } + newlinks[l] = struct{}{} + } + ch.parents[i] = newlinks + } + } + } + for _, rr := range it.results { + main.addResult(rr) + } + } + items = map[*item]struct{}{main: {}} } - c.items = validated - for _, it := range c.items { - _, err := normalizeItem(it, st) - if err != nil { - return err + for it := range items { + r = it + for _, rr := range results { + r.addResult(rr) + } + + // make sure that none of the deps are children of r + allChildren := map[*item]struct{}{} + if err := r.walkChildren(func(i *item) error { + allChildren[i] = struct{}{} + return nil + }, map[*item]struct{}{}); err != nil { + return nil, false, errors.Wrapf(err, "failed to walk children of %s", dgst) + } + for i, dd := range deps { + for j, d := range dd { + if _, ok := allChildren[d.Src.(*item)]; ok { + deps[i][j].Src = nil + } + } + } + break + } + for i, dd := range deps { + for _, d := range dd { + if d.Src == nil { + continue + } + d.Src.(*item).addChild(r, i, d.Selector) } } + return r, true, nil +} + +func IntersectAll[T comparable]( + funcs []func() map[T]struct{}, +) map[T]struct{} { + if len(funcs) == 0 { + return nil + } - st.removeLoops(ctx) + intersection := funcs[0]() - items := make([]*item, 0, len(st.byKey)) - for _, it := range st.byKey { - items = append(items, it) + for _, f := range funcs[1:] { + next := f() + for k := range intersection { + if _, ok := next[k]; !ok { + delete(intersection, k) + } + } + if len(intersection) == 0 { + return nil + } } - c.items = items - return nil + + return intersection } // Marshal converts the cache chains structure into a cache config and a @@ -90,17 +208,13 @@ func (c *CacheChains) normalize(ctx context.Context) error { // consistent digest (since cache configs are typically uploaded and stored in // content-addressable OCI registries). func (c *CacheChains) Marshal(ctx context.Context) (*CacheConfig, DescriptorProvider, error) { - if err := c.normalize(ctx); err != nil { - return nil, nil, err - } - st := &marshalState{ chainsByID: map[string]int{}, descriptors: DescriptorProvider{}, recordsByItem: map[*item]int{}, } - for _, it := range c.items { + for it := range c.leaves() { if err := marshalItem(ctx, it, st); err != nil { return nil, nil, err } @@ -160,34 +274,33 @@ func (p DescriptorProviderPair) SnapshotLabels(descs []ocispecs.Descriptor, inde return nil } +type linkv2 struct { + selector string + index int + digest digest.Digest +} + // item is an implementation of a record in the cache chain. After validation, // normalization and marshalling into the cache config, the item results form // into the "layers", while the digests and the links form into the "records". type item struct { + solver.CacheExporterRecordBase + + id string + // dgst is the unique identifier for each record. // This *roughly* corresponds to an edge (vertex cachekey + index) in the // solver - however, a single vertex can produce multiple unique cache keys // (e.g. fast/slow), so it's a one-to-many relation. dgst digest.Digest - // links are what connect records to each other (with an optional selector), - // organized by input index (which correspond to vertex inputs). - // We can have multiple links for each index, since *any* of these could be - // used to get to this item (e.g. we could retrieve by fast/slow key). - links []map[link]struct{} + children map[unique.Handle[linkv2]]map[*item]struct{} - // backlinks are the inverse of a link - these don't actually get directly - // exported, but they're internally used to help efficiently navigate the - // graph. - backlinks map[*item]struct{} - backlinksMu sync.Mutex + parents []map[link]struct{} - // result is the result of computing the edge - this is the target of the - // data we actually want to store in the cache chain. - result *solver.Remote - resultTime time.Time + results []solver.CacheExportResult - invalid bool + cc *CacheChains } // link is a pointer to an item, with an optional selector. @@ -196,88 +309,110 @@ type link struct { selector string } -func (c *item) removeLink(src *item) bool { - found := false - for idx := range c.links { - for l := range c.links[idx] { - if l.src == src { - delete(c.links[idx], l) - found = true - } - } - } - for idx := range c.links { - if len(c.links[idx]) == 0 { - c.links = nil - break - } +func (c *item) addChild(src *item, index int, selector string) { + if c.children == nil { + c.children = map[unique.Handle[linkv2]]map[*item]struct{}{} } - return found -} - -func (c *item) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) { - c.resultTime = createdAt - c.result = result -} - -func (c *item) LinkFrom(rec solver.CacheExporterRecord, index int, selector string) { - src, ok := rec.(*item) + h := unique.Make(linkv2{ + selector: selector, + index: index, + digest: src.dgst, + }) + m, ok := c.children[h] if !ok { + m = map[*item]struct{}{} + c.children[h] = m + } + if _, ok := m[src]; ok { return } + m[src] = struct{}{} - for index >= len(c.links) { - c.links = append(c.links, map[link]struct{}{}) + for index >= len(src.parents) { + src.parents = append(src.parents, map[link]struct{}{}) } + src.parents[index][link{src: c, selector: selector}] = struct{}{} +} - c.links[index][link{src: src, selector: selector}] = struct{}{} - src.backlinksMu.Lock() - src.backlinks[c] = struct{}{} - src.backlinksMu.Unlock() +func (c *item) addResult(r solver.CacheExportResult) { + var exists bool + for _, rr := range c.results { + if !rr.CreatedAt.Equal(r.CreatedAt) { + continue + } + if len(rr.Result.Descriptors) != len(r.Result.Descriptors) { + continue + } + for i, d := range rr.Result.Descriptors { + if d.Digest != r.Result.Descriptors[i].Digest { + continue + } + } + exists = true + break + } + if !exists { + c.results = append(c.results, r) + } } -// validate checks if an item is valid (i.e. each index has at least one link) -// and marks it as such. -// -// Essentially, if an index has no links, it means that this cache record is -// unreachable by the cache importer, so we should remove it. Once we've marked -// an item as invalid, we remove it from it's backlinks and check it's -// validity again - since now this linked item may be unreachable too. -func (c *item) validate() { - if c.invalid { - // early exit, if the item is already invalid, we've already gone - // through the backlinks +func (c *item) computeID() { + if c.id != "" { return } - for _, m := range c.links { - // if an index has no links, there's no way to access this record, so - // mark it as invalid - if len(m) == 0 { - c.invalid = true - break - } + if len(c.parents) == 0 { + c.id = c.dgst.String() + return } - if c.invalid { - for bl := range c.backlinks { - // remove ourselves from the backlinked item - changed := false - for _, m := range bl.links { - for l := range m { - if l.src == c { - delete(m, l) - changed = true - } - } + // deterministic ID + h := xxhash.New() + h.Write([]byte(c.dgst.String())) + h.Write([]byte{0}) + + for idx, m := range c.parents { + binary.Write(h, binary.LittleEndian, uint32(idx)) + h.Write([]byte{0}) + for l := range m { + if l.src.id == "" { + l.src.computeID() } + h.Write([]byte(l.src.id)) + h.Write([]byte{0}) + h.Write([]byte(l.selector)) + h.Write([]byte{0}) + } + } + c.id = string(h.Sum(nil)) +} + +func (c *item) bestResult() *solver.CacheExportResult { + if len(c.results) == 0 { + return nil + } + slices.SortFunc(c.results, func(a, b solver.CacheExportResult) int { + return b.CreatedAt.Compare(a.CreatedAt) + }) + return &c.results[0] +} - // if we've removed ourselves, we need to check it again - if changed { - bl.validate() +func (c *item) walkChildren(fn func(i *item) error, visited map[*item]struct{}) error { + if _, ok := visited[c]; ok { + return nil + } + visited[c] = struct{}{} + if err := fn(c); err != nil { + return err + } + for _, ch := range c.children { + for it := range ch { + if err := it.walkChildren(fn, visited); err != nil { + return err } } } + return nil } func (c *item) walkAllResults(fn func(i *item) error, visited map[*item]struct{}) error { @@ -288,7 +423,7 @@ func (c *item) walkAllResults(fn func(i *item) error, visited map[*item]struct{} if err := fn(c); err != nil { return err } - for _, links := range c.links { + for _, links := range c.parents { for l := range links { if err := l.src.walkAllResults(fn, visited); err != nil { return err @@ -297,13 +432,3 @@ func (c *item) walkAllResults(fn func(i *item) error, visited map[*item]struct{} } return nil } - -// nopRecord is used to discard cache results that we're not interested in storing. -type nopRecord struct { -} - -func (c *nopRecord) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) { -} - -func (c *nopRecord) LinkFrom(rec solver.CacheExporterRecord, index int, selector string) { -} diff --git a/cache/remotecache/v1/chains_test.go b/cache/remotecache/v1/chains_test.go index 561fe13422ae..9182639e89ad 100644 --- a/cache/remotecache/v1/chains_test.go +++ b/cache/remotecache/v1/chains_test.go @@ -15,13 +15,15 @@ import ( func TestSimpleMarshal(t *testing.T) { cc := NewCacheChains() + now := time.Now() addRecords := func() { - foo := cc.Add(outputKey(dgst("foo"), 0)) - bar := cc.Add(outputKey(dgst("bar"), 1)) - baz := cc.Add(outputKey(dgst("baz"), 0)) + foo, ok, err := cc.Add(outputKey(dgst("foo"), 0), nil, nil) + require.NoError(t, err) + require.True(t, ok) + bar, ok, err := cc.Add(outputKey(dgst("bar"), 1), nil, nil) + require.NoError(t, err) + require.True(t, ok) - baz.LinkFrom(foo, 0, "") - baz.LinkFrom(bar, 1, "sel0") r0 := &solver.Remote{ Descriptors: []ocispecs.Descriptor{{ Digest: dgst("d0"), @@ -29,7 +31,16 @@ func TestSimpleMarshal(t *testing.T) { Digest: dgst("d1"), }}, } - baz.AddResult("", 0, time.Now(), r0) + + _, ok, err = cc.Add(outputKey(dgst("baz"), 0), [][]solver.CacheLink{ + {{Src: foo, Selector: ""}}, + {{Src: bar, Selector: "sel0"}}, + }, []solver.CacheExportResult{{ + CreatedAt: now, + Result: r0, + }}) + require.NoError(t, err) + require.True(t, ok) } addRecords() @@ -84,7 +95,9 @@ func TestSimpleMarshal(t *testing.T) { require.Equal(t, cfg, cfg3) // add extra item - cc.Add(outputKey(dgst("bay"), 0)) + _, ok, err := cc.Add(outputKey(dgst("bay"), 0), nil, nil) + require.NoError(t, err) + require.True(t, ok) cfg, _, err = cc.Marshal(context.TODO()) require.NoError(t, err) diff --git a/cache/remotecache/v1/parse.go b/cache/remotecache/v1/parse.go index 44b1645b9640..ff30d9b624a6 100644 --- a/cache/remotecache/v1/parse.go +++ b/cache/remotecache/v1/parse.go @@ -37,23 +37,32 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver. return r, nil } + cache[idx] = nil if idx < 0 || idx >= len(cc.Records) { return nil, errors.Errorf("invalid record ID: %d", idx) } rec := cc.Records[idx] - r := t.Add(rec.Digest) - cache[idx] = nil + links := make([][]solver.CacheLink, len(rec.Inputs)) + for i, inputs := range rec.Inputs { - for _, inp := range inputs { + if len(inputs) == 0 { + return nil, errors.Errorf("invalid empty input for record %d", idx) + } + links[i] = make([]solver.CacheLink, len(inputs)) + for j, inp := range inputs { src, err := parseRecord(cc, inp.LinkIndex, provider, t, cache) if err != nil { return nil, err } - r.LinkFrom(src, i, inp.Selector) + links[i][j] = solver.CacheLink{ + Selector: inp.Selector, + Src: src, + } } } + results := make([]solver.CacheExportResult, 0, len(rec.Results)) for _, res := range rec.Results { visited := map[int]struct{}{} remote, err := getRemoteChain(cc.Layers, res.LayerIndex, provider, visited) @@ -61,10 +70,12 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver. return nil, err } if remote != nil { - r.AddResult("", 0, res.CreatedAt, remote) + results = append(results, solver.CacheExportResult{ + CreatedAt: res.CreatedAt, + Result: remote, + }) } } - for _, res := range rec.ChainedResults { remote := &solver.Remote{} mp := contentutil.NewMultiProvider(nil) @@ -86,10 +97,17 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver. } if remote != nil { remote.Provider = mp - r.AddResult("", 0, res.CreatedAt, remote) + results = append(results, solver.CacheExportResult{ + CreatedAt: res.CreatedAt, + Result: remote, + }) } } + r, _, err := t.Add(rec.Digest, links, results) + if err != nil { + return nil, errors.Wrapf(err, "failed to add record %d", idx) + } cache[idx] = r return r, nil } diff --git a/cache/remotecache/v1/utils.go b/cache/remotecache/v1/utils.go index cf014913e985..59eb9e556a41 100644 --- a/cache/remotecache/v1/utils.go +++ b/cache/remotecache/v1/utils.go @@ -9,7 +9,6 @@ import ( cerrdefs "github.com/containerd/errdefs" "github.com/moby/buildkit/solver" - "github.com/moby/buildkit/util/bklog" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -118,149 +117,6 @@ type nlink struct { input int selector string } -type normalizeState struct { - added map[*item]*item - links map[*item]map[nlink]map[digest.Digest]struct{} - byKey map[digest.Digest]*item - next int -} - -func (s *normalizeState) removeLoops(ctx context.Context) { - roots := []digest.Digest{} - for dgst, it := range s.byKey { - if len(it.links) == 0 { - roots = append(roots, dgst) - } - } - - visited := map[digest.Digest]struct{}{} - - for _, d := range roots { - s.checkLoops(ctx, d, visited) - } -} - -func (s *normalizeState) checkLoops(ctx context.Context, d digest.Digest, visited map[digest.Digest]struct{}) { - it, ok := s.byKey[d] - if !ok { - return - } - links, ok := s.links[it] - if !ok { - return - } - visited[d] = struct{}{} - defer func() { - delete(visited, d) - }() - - for l, ids := range links { - for id := range ids { - if _, ok := visited[id]; ok { - it2, ok := s.byKey[id] - if !ok { - continue - } - if !it2.removeLink(it) { - bklog.G(ctx).Warnf("failed to remove looping cache key %s %s", d, id) - } - delete(links[l], id) - } else { - s.checkLoops(ctx, id, visited) - } - } - } -} - -func normalizeItem(it *item, state *normalizeState) (*item, error) { - if it2, ok := state.added[it]; ok { - return it2, nil - } - - if len(it.links) == 0 { - id := it.dgst - if it2, ok := state.byKey[id]; ok { - state.added[it] = it2 - return it2, nil - } - state.byKey[id] = it - state.added[it] = it - return nil, nil - } - - matches := map[digest.Digest]struct{}{} - - // check if there is already a matching record - for i, m := range it.links { - if len(m) == 0 { - return nil, errors.Errorf("invalid incomplete links") - } - for l := range m { - nl := nlink{dgst: it.dgst, input: i, selector: l.selector} - it2, err := normalizeItem(l.src, state) - if err != nil { - return nil, err - } - links := state.links[it2][nl] - if i == 0 { - for id := range links { - matches[id] = struct{}{} - } - } else { - for id := range matches { - if _, ok := links[id]; !ok { - delete(matches, id) - } - } - } - } - } - - var id digest.Digest - - links := it.links - - if len(matches) > 0 { - for m := range matches { - if id == "" || id > m { - id = m - } - } - } else { - // keep tmp IDs deterministic - state.next++ - id = digest.FromBytes(fmt.Appendf(nil, "%d", state.next)) - state.byKey[id] = it - it.links = make([]map[link]struct{}, len(it.links)) - for i := range it.links { - it.links[i] = map[link]struct{}{} - } - } - - it2 := state.byKey[id] - state.added[it] = it2 - - for i, m := range links { - for l := range m { - subIt, err := normalizeItem(l.src, state) - if err != nil { - return nil, err - } - it2.links[i][link{src: subIt, selector: l.selector}] = struct{}{} - - nl := nlink{dgst: it.dgst, input: i, selector: l.selector} - if _, ok := state.links[subIt]; !ok { - state.links[subIt] = map[nlink]map[digest.Digest]struct{}{} - } - if _, ok := state.links[subIt][nl]; !ok { - state.links[subIt][nl] = map[digest.Digest]struct{}{} - } - state.links[subIt][nl][id] = struct{}{} - } - } - - return it2, nil -} type marshalState struct { layers []CacheLayer @@ -323,13 +179,14 @@ func marshalItem(ctx context.Context, it *item, state *marshalState) error { if _, ok := state.recordsByItem[it]; ok { return nil } + state.recordsByItem[it] = -1 rec := CacheRecord{ Digest: it.dgst, - Inputs: make([][]CacheInput, len(it.links)), + Inputs: make([][]CacheInput, len(it.parents)), } - for i, m := range it.links { + for i, m := range it.parents { for l := range m { if err := marshalItem(ctx, l.src, state); err != nil { return err @@ -338,6 +195,9 @@ func marshalItem(ctx context.Context, it *item, state *marshalState) error { if !ok { return errors.Errorf("invalid source record: %v", l.src) } + if idx == -1 { + continue + } rec.Inputs[i] = append(rec.Inputs[i], CacheInput{ Selector: l.selector, LinkIndex: idx, @@ -345,14 +205,14 @@ func marshalItem(ctx context.Context, it *item, state *marshalState) error { } } - if it.result != nil { - id := marshalRemote(ctx, it.result, state) + if res := it.bestResult(); res != nil { + id := marshalRemote(ctx, res.Result, state) if id != "" { idx, ok := state.chainsByID[id] if !ok { return errors.Errorf("parent chainid not found") } - rec.Results = append(rec.Results, CacheResult{LayerIndex: idx, CreatedAt: it.resultTime}) + rec.Results = append(rec.Results, CacheResult{LayerIndex: idx, CreatedAt: res.CreatedAt}) } } diff --git a/go.mod b/go.mod index 32cffc11af77..dd182fb295c1 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.27 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.8 github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2 + github.com/cespare/xxhash/v2 v2.3.0 github.com/containerd/accelerated-container-image v1.3.0 github.com/containerd/console v1.0.5 github.com/containerd/containerd/api v1.9.0 @@ -130,7 +131,6 @@ require ( github.com/aws/smithy-go v1.20.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/cgroups/v3 v3.0.5 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/fifo v1.1.0 // indirect diff --git a/solver/edge.go b/solver/edge.go index 389197e009f9..94ff753f3a56 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -946,19 +946,19 @@ func (e *edge) loadCache(ctx context.Context) (any, error) { e.cacheRecordsLoaded[rec.ID] = struct{}{} bklog.G(ctx).Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID) - res, err := e.op.LoadCache(ctx, rec) + res, ctxOpts, err := e.op.LoadCache(ctx, rec) if err != nil { bklog.G(ctx).Debugf("load cache for %s err: %v", e.edge.Vertex.Name(), err) return nil, errors.Wrap(err, "failed to load cache") } - return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil + return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e, recordCtxOpts: ctxOpts}}}), nil } // execOp creates a request to execute the vertex operation func (e *edge) execOp(ctx context.Context) (any, error) { cacheKeys, inputs := e.commitOptions() - results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs)) + results, subExporters, ctxOpts, err := e.op.Exec(ctx, toResultSlice(inputs)) if err != nil { return nil, errors.WithStack(err) } @@ -986,6 +986,7 @@ func (e *edge) execOp(ctx context.Context) (any, error) { if exp, ok := ck.Exporter.(*exporter); ok { exp.edge = e + exp.recordCtxOpts = ctxOpts } exps := make([]CacheExporter, 0, len(subExporters)) diff --git a/solver/exporter.go b/solver/exporter.go index 29b23ac61d13..3f6dc1ccf5a4 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -9,48 +9,85 @@ import ( ) type exporter struct { - k *CacheKey - records []*CacheRecord - record *CacheRecord + k *CacheKey + records []*CacheRecord + record *CacheRecord + recordCtxOpts func(context.Context) context.Context edge *edge // for secondaryExporters override *bool } -func addBacklinks(t CacheExporterTarget, rec CacheExporterRecord, cm *cacheManager, id string, bkm map[string]CacheExporterRecord) (CacheExporterRecord, error) { - if rec == nil { - var ok bool - rec, ok = bkm[id] - if ok && rec != nil { - return rec, nil - } - _ = ok +func addBacklinks(t CacheExporterTarget, cm *cacheManager, id string, bkm map[string][]CacheExporterRecord) ([]CacheExporterRecord, error) { + out, ok := bkm[id] + if ok && out != nil { + return out, nil + } else if ok && out == nil { + return nil, nil } bkm[id] = nil + + m := map[digest.Digest][][]CacheLink{} + isRoot := true if err := cm.backend.WalkBacklinks(id, func(id string, link CacheInfoLink) error { - if rec == nil { - rec = t.Add(link.Digest) + isRoot = false + recs, err := addBacklinks(t, cm, id, bkm) + if err != nil { // TODO: should we continue on error? + return err } - r, ok := bkm[id] - if !ok { - var err error - r, err = addBacklinks(t, nil, cm, id, bkm) - if err != nil { - return err - } + links := m[link.Digest] + for int(link.Input) >= len(links) { + links = append(links, nil) } - if r != nil { - rec.LinkFrom(r, int(link.Input), link.Selector.String()) + for _, rec := range recs { + links[int(link.Input)] = append(links[int(link.Input)], CacheLink{Src: rec, Selector: link.Selector.String()}) } + m[link.Digest] = links return nil }); err != nil { return nil, err } - if rec == nil { - rec = t.Add(digest.Digest(id)) + + if isRoot { + dgst, err := digest.Parse(id) + if err == nil { + rec, ok, err := t.Add(dgst, nil, nil) + if err != nil { + return nil, err + } + if ok && rec != nil { + out = append(out, rec) + } + } + } + + // validate that all inputs are present + for dgst, links := range m { + for _, links := range links { + if len(links) == 0 { + out = nil + m[dgst] = nil + break + } + } + } + + for dgst, links := range m { + if len(links) == 0 { + continue + } + rec, ok, err := t.Add(dgst, links, nil) + if err != nil { + return nil, err + } + if !ok || rec == nil { + continue + } + out = append(out, rec) } - bkm[id] = rec - return rec, nil + + bkm[id] = out + return out, nil } type contextT string @@ -61,13 +98,13 @@ var ( ) func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt CacheExportOpt) ([]CacheExporterRecord, error) { - var bkm map[string]CacheExporterRecord + var bkm map[string][]CacheExporterRecord if bk := ctx.Value(backlinkKey); bk == nil { - bkm = map[string]CacheExporterRecord{} + bkm = map[string][]CacheExporterRecord{} ctx = context.WithValue(ctx, backlinkKey, bkm) } else { - bkm = bk.(map[string]CacheExporterRecord) + bkm = bk.(map[string][]CacheExporterRecord) } var res map[*exporter][]CacheExporterRecord @@ -77,23 +114,17 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach } else { res = r.(map[*exporter][]CacheExporterRecord) } - - if t.Visited(e) { - return res[e], nil + if v, ok := res[e]; ok { + return v, nil } - t.Visit(e) + res[e] = nil deps := e.k.Deps() - type expr struct { - r CacheExporterRecord - selector digest.Digest - } k := e.k.clone() // protect against *CacheKey internal ids mutation from other exports recKey := rootKey(k.Digest(), k.Output()) - rec := t.Add(recKey) - allRec := []CacheExporterRecord{rec} + results := []CacheExportResult{} addRecord := true @@ -112,10 +143,12 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach var remote *Remote var i int + mainCtx := ctx + if CacheOptGetterOf(ctx) == nil && e.recordCtxOpts != nil { + ctx = e.recordCtxOpts(ctx) + } v := e.record - for exportRecord && addRecord { - var variants []CacheExporterRecord if v == nil { if i < len(records) { v = records[i] @@ -144,9 +177,12 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach } if opt.CompressionOpt != nil { for _, r := range remotes { // record all remaining remotes as well - rec := t.Add(recKey) - rec.AddResult(k.vtx, int(k.output), v.CreatedAt, r) - variants = append(variants, rec) + results = append(results, CacheExportResult{ + CreatedAt: v.CreatedAt, + Result: r, + EdgeVertex: k.vtx, + EdgeIndex: k.output, + }) } } @@ -165,19 +201,24 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach } if opt.CompressionOpt != nil { for _, r := range remotes { // record all remaining remotes as well - rec := t.Add(recKey) - rec.AddResult(k.vtx, int(k.output), v.CreatedAt, r) - variants = append(variants, rec) + results = append(results, CacheExportResult{ + CreatedAt: v.CreatedAt, + Result: r, + EdgeVertex: k.vtx, + EdgeIndex: k.output, + }) } } } if remote != nil { - for _, rec := range allRec { - rec.AddResult(k.vtx, int(k.output), v.CreatedAt, remote) - } + results = append(results, CacheExportResult{ + CreatedAt: v.CreatedAt, + Result: remote, + EdgeVertex: k.vtx, + EdgeIndex: k.output, + }) } - allRec = append(allRec, variants...) break } @@ -185,44 +226,37 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach opt.Mode = CacheExportModeRemoteOnly } - srcs := make([][]expr, len(deps)) + srcs := make([][]CacheLink, len(deps)) for i, deps := range deps { for _, dep := range deps { - recs, err := dep.CacheKey.Exporter.ExportTo(ctx, t, opt) + rec, err := dep.CacheKey.Exporter.ExportTo(ctx, t, opt) if err != nil { - return nil, nil + return nil, err } - for _, r := range recs { - srcs[i] = append(srcs[i], expr{r: r, selector: dep.Selector}) + for _, r := range rec { + srcs[i] = append(srcs[i], CacheLink{Src: r, Selector: string(dep.Selector)}) } } } if e.edge != nil { for _, de := range e.edge.secondaryExporters { - recs, err := de.cacheKey.CacheKey.Exporter.ExportTo(ctx, t, opt) + recs, err := de.cacheKey.CacheKey.Exporter.ExportTo(mainCtx, t, opt) if err != nil { return nil, nil } for _, r := range recs { - srcs[de.index] = append(srcs[de.index], expr{r: r, selector: de.cacheKey.Selector}) + srcs[de.index] = append(srcs[de.index], CacheLink{Src: r, Selector: de.cacheKey.Selector.String()}) } } } - for _, rec := range allRec { - for i, srcs := range srcs { - for _, src := range srcs { - rec.LinkFrom(src.r, i, src.selector.String()) - } - } - - if !opt.IgnoreBacklinks { - for cm, id := range k.ids { - if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil { - return nil, err - } + if !opt.IgnoreBacklinks { + for cm, id := range k.ids { + _, err := addBacklinks(t, cm, id, bkm) + if err != nil { + return nil, err } } } @@ -234,16 +268,35 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach if id == key { return nil } - allRec = append(allRec, t.Add(digest.Digest(id))) - return nil + hasBacklinks := false + cm.backend.WalkBacklinks(id, func(id string, link CacheInfoLink) error { + hasBacklinks = true + return nil + }) + if hasBacklinks { + return nil + } + + dgst, err := digest.Parse(id) + if err != nil { + return nil + } + _, _, err = t.Add(dgst, nil, results) + return err }); err != nil { return nil, err } } - res[e] = allRec - - return allRec, nil + out, ok, err := t.Add(recKey, srcs, results) + if err != nil { + return nil, err + } + res[e] = []CacheExporterRecord{} + if ok { + res[e] = append(res[e], out) + } + return res[e], nil } func getBestResult(records []*CacheRecord) *CacheRecord { diff --git a/solver/jobs.go b/solver/jobs.go index cffe360afc6a..68558386a8d7 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -843,8 +843,8 @@ type cacheMapResp struct { type activeOp interface { CacheMap(context.Context, int) (*cacheMapResp, error) - LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) - Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) + LoadCache(ctx context.Context, rec *CacheRecord) (Result, func(context.Context) context.Context, error) + Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, ctxOpts func(context.Context) context.Context, err error) IgnoreCache() bool Cache() CacheManager CalcSlowCache(context.Context, Index, PreprocessFunc, ResultBasedCacheFunc, Result) (digest.Digest, error) @@ -909,7 +909,7 @@ func (c cacheWithCacheOpts) Records(ctx context.Context, ck *CacheKey) ([]*Cache return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.st), ck) } -func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) { +func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, func(context.Context) context.Context, error) { ctx = progress.WithProgress(ctx, s.st.mpw) if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) @@ -921,7 +921,9 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec) tracing.FinishWithError(span, err) notifyCompleted(err, true) - return res, err + return res, func(ctx context.Context) context.Context { + return withAncestorCacheOpts(ctx, s.st) + }, err } // CalcSlowCache computes the digest of an input that is ready and has been @@ -1079,14 +1081,14 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, return &cacheMapResp{CacheMap: res[index], complete: s.cacheDone}, nil } -func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) { +func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, ctxOpts func(context.Context) context.Context, err error) { defer func() { err = errdefs.WithOp(err, s.st.vtx.Sys(), s.st.vtx.Options().Description) err = errdefs.WrapVertex(err, s.st.origDigest) }() op, err := s.getOp() if err != nil { - return nil, nil, err + return nil, nil, nil, err } flightControlKey := "exec" res, err := s.gExecRes.Do(ctx, flightControlKey, func(ctx context.Context) (ret *execRes, retErr error) { @@ -1150,9 +1152,11 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, return s.execRes, nil }) if res == nil || err != nil { - return nil, nil, err + return nil, nil, nil, err } - return unwrapShared(res.execRes), res.execExporters, nil + return unwrapShared(res.execRes), res.execExporters, func(ctx context.Context) context.Context { + return withAncestorCacheOpts(ctx, s.st) + }, nil } func (s *sharedOp) getOp() (Op, error) { diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index 4a82f12a2b8b..ae8202f082f1 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/containerd/platforms" slsa02 "github.com/in-toto/in-toto-golang/in_toto/slsa_provenance/v0.2" @@ -514,43 +513,28 @@ type cacheExporter struct { m map[any]struct{} } -func (ce *cacheExporter) Add(dgst digest.Digest) solver.CacheExporterRecord { - return &cacheRecord{ - ce: ce, +func (ce *cacheExporter) Add(dgst digest.Digest, deps [][]solver.CacheLink, results []solver.CacheExportResult) (solver.CacheExporterRecord, bool, error) { + for _, res := range results { + if res.EdgeVertex == "" { + continue + } + e := edge{ + digest: res.EdgeVertex, + index: int(res.EdgeIndex), + } + descs := make([]ocispecs.Descriptor, len(res.Result.Descriptors)) + for i, desc := range res.Result.Descriptors { + d := desc + d.Annotations = containerimage.RemoveInternalLayerAnnotations(d.Annotations, true) + descs[i] = d + } + ce.layers[e] = appendLayerChain(ce.layers[e], descs) } -} - -func (ce *cacheExporter) Visit(target any) { - ce.m[target] = struct{}{} -} - -func (ce *cacheExporter) Visited(target any) bool { - _, ok := ce.m[target] - return ok + return &cacheRecord{}, true, nil } type cacheRecord struct { - ce *cacheExporter -} - -func (c *cacheRecord) AddResult(dgst digest.Digest, idx int, createdAt time.Time, result *solver.Remote) { - if result == nil || dgst == "" { - return - } - e := edge{ - digest: dgst, - index: idx, - } - descs := make([]ocispecs.Descriptor, len(result.Descriptors)) - for i, desc := range result.Descriptors { - d := desc - d.Annotations = containerimage.RemoveInternalLayerAnnotations(d.Annotations, true) - descs[i] = d - } - c.ce.layers[e] = appendLayerChain(c.ce.layers[e], descs) -} - -func (c *cacheRecord) LinkFrom(rec solver.CacheExporterRecord, index int, selector string) { + solver.CacheExporterRecordBase } func resolveRemotes(ctx context.Context, res solver.Result) ([]*solver.Remote, error) { diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 07bca32a4a9b..97c8a2b9761d 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -2120,15 +2120,13 @@ func TestCacheExporting(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - - require.Equal(t, 3, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 3, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) - require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 2, expTarget.records[0].links) + require.Equal(t, 1, expTarget.records[5].results) + require.Equal(t, 0, expTarget.records[0].links) require.Equal(t, 0, expTarget.records[1].links) - require.Equal(t, 0, expTarget.records[2].links) + require.Equal(t, 2, expTarget.records[5].links) j1, err := l.NewJob("j1") require.NoError(t, err) @@ -2151,15 +2149,14 @@ func TestCacheExporting(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() // the order of the records isn't really significant - require.Equal(t, 3, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 3, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) - require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 2, expTarget.records[0].links) + require.Equal(t, 1, expTarget.records[5].results) + require.Equal(t, 0, expTarget.records[0].links) require.Equal(t, 0, expTarget.records[1].links) - require.Equal(t, 0, expTarget.records[2].links) + require.Equal(t, 2, expTarget.records[5].links) } func TestCacheExportingModeMin(t *testing.T) { @@ -2208,17 +2205,16 @@ func TestCacheExportingModeMin(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(false)) require.NoError(t, err) - expTarget.normalize() - - require.Equal(t, 4, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 4, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) require.Equal(t, 0, expTarget.records[2].results) require.Equal(t, 0, expTarget.records[3].results) - require.Equal(t, 2, expTarget.records[0].links) - require.Equal(t, 1, expTarget.records[1].links) - require.Equal(t, 0, expTarget.records[2].links) - require.Equal(t, 0, expTarget.records[3].links) + require.Equal(t, 1, expTarget.records[7].results) + require.Equal(t, 0, expTarget.records[0].links) + require.Equal(t, 0, expTarget.records[1].links) + require.Equal(t, 1, expTarget.records[2].links) + require.Equal(t, 2, expTarget.records[7].links) j1, err := l.NewJob("j1") require.NoError(t, err) @@ -2241,17 +2237,18 @@ func TestCacheExportingModeMin(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(false)) require.NoError(t, err) - expTarget.normalize() // the order of the records isn't really significant - require.Equal(t, 4, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 4, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 0, expTarget.records[3].results) - require.Equal(t, 2, expTarget.records[0].links) - require.Equal(t, 1, expTarget.records[1].links) - require.Equal(t, 0, expTarget.records[2].links) - require.Equal(t, 0, expTarget.records[3].links) + require.Equal(t, 1, expTarget.records[7].results) + require.Equal(t, 0, expTarget.records[0].links) + require.Equal(t, 0, expTarget.records[1].links) + require.Equal(t, 1, expTarget.records[2].links) + require.Equal(t, 1, expTarget.records[3].links) + require.Equal(t, 2, expTarget.records[6].links) + require.Equal(t, 2, expTarget.records[7].links) // one more check with all mode j2, err := l.NewJob("j2") @@ -2275,17 +2272,17 @@ func TestCacheExportingModeMin(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() // the order of the records isn't really significant - require.Equal(t, 4, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) - require.Equal(t, 1, expTarget.records[1].results) + require.Equal(t, 4, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) + require.Equal(t, 0, expTarget.records[1].results) require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 0, expTarget.records[3].results) - require.Equal(t, 2, expTarget.records[0].links) - require.Equal(t, 1, expTarget.records[1].links) - require.Equal(t, 0, expTarget.records[2].links) - require.Equal(t, 0, expTarget.records[3].links) + require.Equal(t, 1, expTarget.records[3].results) + require.Equal(t, 1, expTarget.records[7].results) + require.Equal(t, 0, expTarget.records[0].links) + require.Equal(t, 0, expTarget.records[1].links) + require.Equal(t, 1, expTarget.records[2].links) + require.Equal(t, 2, expTarget.records[6].links) } func TestSlowCacheAvoidAccess(t *testing.T) { @@ -2577,8 +2574,7 @@ func TestCacheMultipleMaps(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - require.Equal(t, 3, len(expTarget.records)) + require.Equal(t, 3, expTarget.numUniqueRecords()) j1, err := l.NewJob("j1") require.NoError(t, err) @@ -2613,7 +2609,7 @@ func TestCacheMultipleMaps(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - require.Equal(t, 3, len(expTarget.records)) + require.Equal(t, 3, expTarget.numUniqueRecords()) require.Equal(t, false, called) j2, err := l.NewJob("j2") @@ -2648,7 +2644,7 @@ func TestCacheMultipleMaps(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - require.Equal(t, 3, len(expTarget.records)) + require.Equal(t, 3, expTarget.numUniqueRecords()) require.Equal(t, true, called) } @@ -2699,8 +2695,7 @@ func TestCacheInputMultipleMaps(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - require.Equal(t, 3, len(expTarget.records)) + require.Equal(t, 3, expTarget.numUniqueRecords()) require.NoError(t, j0.Discard()) j0 = nil @@ -2738,8 +2733,7 @@ func TestCacheInputMultipleMaps(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - require.Equal(t, 3, len(expTarget.records)) + require.Equal(t, 3, expTarget.numUniqueRecords()) require.NoError(t, j1.Discard()) j1 = nil @@ -2799,14 +2793,15 @@ func TestCacheExportingPartialSelector(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - require.Equal(t, 3, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 3, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 2, expTarget.records[0].links) + require.Equal(t, 1, expTarget.records[5].results) + require.Equal(t, 0, expTarget.records[0].links) require.Equal(t, 0, expTarget.records[1].links) require.Equal(t, 0, expTarget.records[2].links) + require.Equal(t, 1, expTarget.records[5].links) // repeat so that all coming from cache are retained j1, err := l.NewJob("j1") @@ -2832,16 +2827,14 @@ func TestCacheExportingPartialSelector(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - // the order of the records isn't really significant - require.Equal(t, 3, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 3, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) - require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 2, expTarget.records[0].links) + require.Equal(t, 1, expTarget.records[4].results) + require.Equal(t, 0, expTarget.records[0].links) require.Equal(t, 0, expTarget.records[1].links) - require.Equal(t, 0, expTarget.records[2].links) + require.Equal(t, 1, expTarget.records[4].links) // repeat with forcing a slow key recomputation j2, err := l.NewJob("j2") @@ -2886,19 +2879,16 @@ func TestCacheExportingPartialSelector(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - // the order of the records isn't really significant // adds one - require.Equal(t, 4, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) + require.Equal(t, 4, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) require.Equal(t, 0, expTarget.records[1].results) - require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 0, expTarget.records[3].results) - require.Equal(t, 3, expTarget.records[0].links) + require.Equal(t, 1, expTarget.records[6].results) + require.Equal(t, 0, expTarget.records[0].links) require.Equal(t, 0, expTarget.records[1].links) require.Equal(t, 0, expTarget.records[2].links) - require.Equal(t, 0, expTarget.records[3].links) + require.Equal(t, 1, expTarget.records[6].links) // repeat with a wrapper j3, err := l.NewJob("j3") @@ -2932,21 +2922,19 @@ func TestCacheExportingPartialSelector(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - // adds one extra result // the order of the records isn't really significant - require.Equal(t, 5, len(expTarget.records)) - require.Equal(t, 1, expTarget.records[0].results) - require.Equal(t, 1, expTarget.records[1].results) + require.Equal(t, 5, expTarget.numUniqueRecords()) + require.Equal(t, 0, expTarget.records[0].results) + require.Equal(t, 0, expTarget.records[1].results) require.Equal(t, 0, expTarget.records[2].results) - require.Equal(t, 0, expTarget.records[3].results) - require.Equal(t, 0, expTarget.records[4].results) - require.Equal(t, 1, expTarget.records[0].links) - require.Equal(t, 3, expTarget.records[1].links) + require.Equal(t, 1, expTarget.records[5].results) + require.Equal(t, 1, expTarget.records[7].results) + require.Equal(t, 0, expTarget.records[0].links) + require.Equal(t, 0, expTarget.records[1].links) require.Equal(t, 0, expTarget.records[2].links) - require.Equal(t, 0, expTarget.records[3].links) - require.Equal(t, 0, expTarget.records[4].links) + require.Equal(t, 1, expTarget.records[5].links) + require.Equal(t, 1, expTarget.records[7].links) } func TestCacheExportingMergedKey(t *testing.T) { @@ -3030,9 +3018,7 @@ func TestCacheExportingMergedKey(t *testing.T) { _, err = res.CacheKeys()[0].Exporter.ExportTo(ctx, expTarget, testExporterOpts(true)) require.NoError(t, err) - expTarget.normalize() - - require.Equal(t, 5, len(expTarget.records)) + require.Equal(t, 5, expTarget.numUniqueRecords()) } // moby/buildkit#434 @@ -4084,57 +4070,23 @@ type testExporterTarget struct { records []*testExporterRecord } -func (t *testExporterTarget) Add(dgst digest.Digest) CacheExporterRecord { - r := &testExporterRecord{dgst: dgst} - t.records = append(t.records, r) - return r -} - -func (t *testExporterTarget) Visit(v any) { - t.visited[v] = struct{}{} -} - -func (t *testExporterTarget) Visited(v any) bool { - _, ok := t.visited[v] - return ok -} - -func (t *testExporterTarget) normalize() { - m := map[digest.Digest]struct{}{} - rec := make([]*testExporterRecord, 0, len(t.records)) +func (t *testExporterTarget) numUniqueRecords() int { + unique := make(map[digest.Digest]struct{}) for _, r := range t.records { - if _, ok := m[r.dgst]; ok { - for _, r2 := range t.records { - delete(r2.linkMap, r.dgst) - r2.links = len(r2.linkMap) - } - continue - } - m[r.dgst] = struct{}{} - rec = append(rec, r) + unique[r.dgst] = struct{}{} } - t.records = rec + return len(unique) +} + +func (t *testExporterTarget) Add(dgst digest.Digest, links [][]CacheLink, results []CacheExportResult) (CacheExporterRecord, bool, error) { + r := &testExporterRecord{dgst: dgst, results: len(results), links: len(links)} + t.records = append(t.records, r) + return r, true, nil } type testExporterRecord struct { + CacheExporterRecordBase dgst digest.Digest results int links int - linkMap map[digest.Digest]struct{} -} - -func (r *testExporterRecord) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *Remote) { - r.results++ -} - -func (r *testExporterRecord) LinkFrom(src CacheExporterRecord, index int, selector string) { - if s, ok := src.(*testExporterRecord); ok { - if r.linkMap == nil { - r.linkMap = map[digest.Digest]struct{}{} - } - if _, ok := r.linkMap[s.dgst]; !ok { - r.linkMap[s.dgst] = struct{}{} - r.links++ - } - } } diff --git a/solver/types.go b/solver/types.go index 3b0df74c87b0..1b8443dee1b1 100644 --- a/solver/types.go +++ b/solver/types.go @@ -124,20 +124,29 @@ type CacheExporter interface { // CacheExporterTarget defines object capable of receiving exports type CacheExporterTarget interface { - // Add creates a new object record that we can then add results to and - // connect to other records. - Add(dgst digest.Digest) CacheExporterRecord - - // Visit marks a target as having been visited. - Visit(target any) - // Vistited returns true if a target has previously been marked as visited. - Visited(target any) bool + Add(dgst digest.Digest, deps [][]CacheLink, results []CacheExportResult) (CacheExporterRecord, bool, error) } -// CacheExporterRecord is a single object being exported +// opaque interface type CacheExporterRecord interface { - AddResult(vtx digest.Digest, index int, createdAt time.Time, result *Remote) - LinkFrom(src CacheExporterRecord, index int, selector string) + isCacheExporterRecord() +} + +type CacheExporterRecordBase struct { +} + +func (c *CacheExporterRecordBase) isCacheExporterRecord() {} + +type CacheLink struct { + Src CacheExporterRecord + Selector string +} + +type CacheExportResult struct { + CreatedAt time.Time + Result *Remote + EdgeVertex digest.Digest + EdgeIndex Index } // Remote is a descriptor or a list of stacked descriptors that can be pulled @@ -148,15 +157,6 @@ type Remote struct { Provider content.InfoReaderProvider } -// CacheLink is a link between two cache records -type CacheLink struct { - Source digest.Digest `json:",omitempty"` - Input Index `json:",omitempty"` - Output Index `json:",omitempty"` - Base digest.Digest `json:",omitempty"` - Selector digest.Digest `json:",omitempty"` -} - type ReleaseFunc func() // Op defines how the solver can evaluate the properties of a vertex operation.