From df56dcd13f49f29675073db5843ca0bfc79dda8a Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 9 Jul 2025 15:12:32 -1000 Subject: [PATCH 1/7] provider: add ability co clear provider queue --- provider/internal/queue/queue.go | 76 +++++++++++++++++++++++++++ provider/internal/queue/queue_test.go | 45 ++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index da61cd6b3..58d91594c 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -57,6 +57,7 @@ type Queue struct { dequeue chan cid.Cid ds datastore.Batching enqueue chan cid.Cid + clear chan chan int } // New creates a queue for cids. @@ -69,6 +70,7 @@ func New(ds datastore.Batching) *Queue { dequeue: make(chan cid.Cid), ds: namespace.Wrap(ds, datastore.NewKey("/queue")), enqueue: make(chan cid.Cid), + clear: make(chan chan int), } go q.worker(ctx) @@ -112,6 +114,13 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } +// Clear clears all queued records from memory and the datastore. +func (q *Queue) Clear() int { + rsp := make(chan int) + q.clear <- rsp + return <-rsp +} + func makeCidString(c cid.Cid) string { data := c.Bytes() if len(data) > 4 { @@ -249,6 +258,26 @@ func (q *Queue) worker(ctx context.Context) { case <-ctx.Done(): return + + case rsp := <-q.clear: + var rmMemCount int + if c != cid.Undef { + rmMemCount = 1 + } + c = cid.Undef + k = datastore.Key{} + idle = false + rmMemCount += inBuf.Len() + inBuf.Clear() + dedupCache.Purge() + rmDSCount, err := q.clearDatastore(ctx) + if err != nil { + log.Errorw("provider queue: cannot clear datastore", "err", err) + } else { + dsEmpty = true + } + log.Infow("cleared provider queue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount) + rsp <- rmMemCount + rmDSCount } if commit { @@ -267,6 +296,52 @@ func (q *Queue) worker(ctx context.Context) { } } +func (q *Queue) clearDatastore(ctx context.Context) (int, error) { + qry := query.Query{ + KeysOnly: true, + } + results, err := q.ds.Query(ctx, qry) + if err != nil { + return 0, fmt.Errorf("cannot query datastore: %w", err) + } + defer results.Close() + + batch, err := q.ds.Batch(ctx) + if err != nil { + return 0, fmt.Errorf("cannot create datastore batch: %w", err) + } + + var rmCount, writeCount int + for result := range results.Next() { + if ctx.Err() != nil { + return 0, ctx.Err() + } + if writeCount >= batchSize { + writeCount = 0 + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore updates: %w", err) + } + } + if result.Error != nil { + return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error) + } + if err = batch.Delete(ctx, datastore.NewKey(result.Entry.Key)); err != nil { + return 0, fmt.Errorf("cannot delete key from datastore: %w", err) + } + rmCount++ + writeCount++ + } + + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore updated: %w", err) + } + if err = q.ds.Sync(ctx, datastore.NewKey("")); err != nil { + return 0, fmt.Errorf("cannot sync datastore: %w", err) + } + + return rmCount, nil +} + func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) { qry := query.Query{ Orders: []query.Order{query.OrderByKey{}}, @@ -301,6 +376,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq } counter++ } + cids.Clear() if err = b.Commit(ctx); err != nil { diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index be2874669..c280a9ff1 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -136,3 +136,48 @@ func TestDeduplicateCids(t *testing.T) { assertOrdered(cids, queue, t) } + +func TestClear(t *testing.T) { + const cidCount = 25 + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue := New(ds) + defer queue.Close() + + for _, c := range random.Cids(cidCount) { + queue.Enqueue(c) + } + + // Cause queued entried to be saved in datastore. + err := queue.Close() + if err != nil { + t.Fatal(err) + } + + queue = New(ds) + defer queue.Close() + + for _, c := range random.Cids(cidCount) { + queue.Enqueue(c) + } + + rmCount := queue.Clear() + t.Log("Cleared", rmCount, "entries from provider queue") + if rmCount != 2*cidCount { + t.Fatalf("expected %d cleared, got %d", 2*cidCount, rmCount) + } + + if err = queue.Close(); err != nil { + t.Fatal(err) + } + + // Ensure no data when creating new queue. + queue = New(ds) + defer queue.Close() + + select { + case <-queue.Dequeue(): + t.Fatal("dequeue should not return") + case <-time.After(10 * time.Millisecond): + } +} From 1a44bb7bbbb26ff93a57074f922ef80d591d7ae1 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 9 Jul 2025 16:06:30 -1000 Subject: [PATCH 2/7] Add Clear method to system API --- provider/noop.go | 4 ++++ provider/provider.go | 1 + provider/reprovider.go | 5 +++++ 3 files changed, 10 insertions(+) diff --git a/provider/noop.go b/provider/noop.go index 50c3e3502..4da18da3b 100644 --- a/provider/noop.go +++ b/provider/noop.go @@ -15,6 +15,10 @@ func NewNoopProvider() System { return &noopProvider{} } +func (op *noopProvider) Clear() int { + return 0 +} + func (op *noopProvider) Close() error { return nil } diff --git a/provider/provider.go b/provider/provider.go index c9b3b3dec..79cb835a9 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -34,6 +34,7 @@ type Reprovider interface { // System defines the interface for interacting with the value // provider system type System interface { + Clear() int Close() error Stat() (ReproviderStats, error) Provider diff --git a/provider/reprovider.go b/provider/reprovider.go index 100aee456..1fddadebb 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -399,6 +399,11 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } +// Clear removes all entries from the reprovider queue. +func (s *reprovider) Clear() int { + return s.q.Clear() +} + func (s *reprovider) Close() error { s.close() err := s.q.Close() From 36f095573c82801bb634cb209dfd2b132b9e6148 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 9 Jul 2025 16:28:59 -1000 Subject: [PATCH 3/7] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4807f8de9..c058e9c10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +- `provider`: Add ability to clear (re)provide queue [#978](https://github.com/ipfs/boxo/pull/978) + ### Changed ### Removed From 23a66f073c43f39f5803d16e68df67a945126c50 Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Thu, 10 Jul 2025 01:20:52 -0700 Subject: [PATCH 4/7] Update provider/internal/queue/queue.go Co-authored-by: Guillaume Michel --- provider/internal/queue/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 58d91594c..f9fc0b13a 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -70,7 +70,7 @@ func New(ds datastore.Batching) *Queue { dequeue: make(chan cid.Cid), ds: namespace.Wrap(ds, datastore.NewKey("/queue")), enqueue: make(chan cid.Cid), - clear: make(chan chan int), + clear: make(chan chan<- int), } go q.worker(ctx) From c442467e501cb6a96f24faca12f4a3dee5b06dc3 Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Thu, 10 Jul 2025 01:20:59 -0700 Subject: [PATCH 5/7] Update provider/internal/queue/queue.go Co-authored-by: Guillaume Michel --- provider/internal/queue/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index f9fc0b13a..ed2920240 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -57,7 +57,7 @@ type Queue struct { dequeue chan cid.Cid ds datastore.Batching enqueue chan cid.Cid - clear chan chan int + clear chan chan<- int } // New creates a queue for cids. From 603c2877fddec42fba687e4dc937831ebd73d669 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 9 Jul 2025 22:27:24 -1000 Subject: [PATCH 6/7] review changes --- provider/internal/queue/queue.go | 5 +++-- provider/provider.go | 2 ++ provider/reprovider.go | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index ed2920240..f82f59a06 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -114,7 +114,8 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } -// Clear clears all queued records from memory and the datastore. +// Clear clears all queued records from memory and the datastore. Returns the +// number of CIDs removed from the queue. func (q *Queue) Clear() int { rsp := make(chan int) q.clear <- rsp @@ -325,7 +326,7 @@ func (q *Queue) clearDatastore(ctx context.Context) (int, error) { if result.Error != nil { return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error) } - if err = batch.Delete(ctx, datastore.NewKey(result.Entry.Key)); err != nil { + if err = batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil { return 0, fmt.Errorf("cannot delete key from datastore: %w", err) } rmCount++ diff --git a/provider/provider.go b/provider/provider.go index 79cb835a9..c0573d5d8 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -34,6 +34,8 @@ type Reprovider interface { // System defines the interface for interacting with the value // provider system type System interface { + // Clear removes all entries from the provide queue. Returns the number of + // CIDs removed from the queue. Clear() int Close() error Stat() (ReproviderStats, error) diff --git a/provider/reprovider.go b/provider/reprovider.go index 1fddadebb..d8ed51296 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -399,7 +399,8 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } -// Clear removes all entries from the reprovider queue. +// Clear removes all entries from the provide queue. Returns the number of CIDs +// removed from the queue. func (s *reprovider) Clear() int { return s.q.Clear() } From 20e8e90f05d6f5211456b405f226eb39efa3a619 Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Thu, 10 Jul 2025 01:28:05 -0700 Subject: [PATCH 7/7] Update CHANGELOG.md Co-authored-by: Guillaume Michel --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c058e9c10..6caed3265 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ The following emojis are used to highlight certain changes: ### Added -- `provider`: Add ability to clear (re)provide queue [#978](https://github.com/ipfs/boxo/pull/978) +- `provider`: Add ability to clear provide queue [#978](https://github.com/ipfs/boxo/pull/978) ### Changed