diff --git a/CHANGELOG.md b/CHANGELOG.md index 4807f8de9..6caed3265 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +- `provider`: Add ability to clear provide queue [#978](https://github.com/ipfs/boxo/pull/978) + ### Changed ### Removed diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index da61cd6b3..f82f59a06 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -57,6 +57,7 @@ type Queue struct { dequeue chan cid.Cid ds datastore.Batching enqueue chan cid.Cid + clear chan chan<- int } // New creates a queue for cids. @@ -69,6 +70,7 @@ func New(ds datastore.Batching) *Queue { dequeue: make(chan cid.Cid), ds: namespace.Wrap(ds, datastore.NewKey("/queue")), enqueue: make(chan cid.Cid), + clear: make(chan chan<- int), } go q.worker(ctx) @@ -112,6 +114,14 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } +// Clear clears all queued records from memory and the datastore. Returns the +// number of CIDs removed from the queue. +func (q *Queue) Clear() int { + rsp := make(chan int) + q.clear <- rsp + return <-rsp +} + func makeCidString(c cid.Cid) string { data := c.Bytes() if len(data) > 4 { @@ -249,6 +259,26 @@ func (q *Queue) worker(ctx context.Context) { case <-ctx.Done(): return + + case rsp := <-q.clear: + var rmMemCount int + if c != cid.Undef { + rmMemCount = 1 + } + c = cid.Undef + k = datastore.Key{} + idle = false + rmMemCount += inBuf.Len() + inBuf.Clear() + dedupCache.Purge() + rmDSCount, err := q.clearDatastore(ctx) + if err != nil { + log.Errorw("provider queue: cannot clear datastore", "err", err) + } else { + dsEmpty = true + } + log.Infow("cleared provider queue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount) + rsp <- rmMemCount + rmDSCount } if commit { @@ -267,6 +297,52 @@ func (q *Queue) worker(ctx context.Context) { } } +func (q *Queue) clearDatastore(ctx context.Context) (int, error) { + qry := query.Query{ + KeysOnly: true, + } + results, err := q.ds.Query(ctx, qry) + if err != nil { + return 0, fmt.Errorf("cannot query datastore: %w", err) + } + defer results.Close() + + batch, err := q.ds.Batch(ctx) + if err != nil { + return 0, fmt.Errorf("cannot create datastore batch: %w", err) + } + + var rmCount, writeCount int + for result := range results.Next() { + if ctx.Err() != nil { + return 0, ctx.Err() + } + if writeCount >= batchSize { + writeCount = 0 + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore updates: %w", err) + } + } + if result.Error != nil { + return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error) + } + if err = batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil { + return 0, fmt.Errorf("cannot delete key from datastore: %w", err) + } + rmCount++ + writeCount++ + } + + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore updated: %w", err) + } + if err = q.ds.Sync(ctx, datastore.NewKey("")); err != nil { + return 0, fmt.Errorf("cannot sync datastore: %w", err) + } + + return rmCount, nil +} + func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) { qry := query.Query{ Orders: []query.Order{query.OrderByKey{}}, @@ -301,6 +377,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq } counter++ } + cids.Clear() if err = b.Commit(ctx); err != nil { diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index be2874669..c280a9ff1 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -136,3 +136,48 @@ func TestDeduplicateCids(t *testing.T) { assertOrdered(cids, queue, t) } + +func TestClear(t *testing.T) { + const cidCount = 25 + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue := New(ds) + defer queue.Close() + + for _, c := range random.Cids(cidCount) { + queue.Enqueue(c) + } + + // Cause queued entried to be saved in datastore. + err := queue.Close() + if err != nil { + t.Fatal(err) + } + + queue = New(ds) + defer queue.Close() + + for _, c := range random.Cids(cidCount) { + queue.Enqueue(c) + } + + rmCount := queue.Clear() + t.Log("Cleared", rmCount, "entries from provider queue") + if rmCount != 2*cidCount { + t.Fatalf("expected %d cleared, got %d", 2*cidCount, rmCount) + } + + if err = queue.Close(); err != nil { + t.Fatal(err) + } + + // Ensure no data when creating new queue. + queue = New(ds) + defer queue.Close() + + select { + case <-queue.Dequeue(): + t.Fatal("dequeue should not return") + case <-time.After(10 * time.Millisecond): + } +} diff --git a/provider/noop.go b/provider/noop.go index 50c3e3502..4da18da3b 100644 --- a/provider/noop.go +++ b/provider/noop.go @@ -15,6 +15,10 @@ func NewNoopProvider() System { return &noopProvider{} } +func (op *noopProvider) Clear() int { + return 0 +} + func (op *noopProvider) Close() error { return nil } diff --git a/provider/provider.go b/provider/provider.go index c9b3b3dec..c0573d5d8 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -34,6 +34,9 @@ type Reprovider interface { // System defines the interface for interacting with the value // provider system type System interface { + // Clear removes all entries from the provide queue. Returns the number of + // CIDs removed from the queue. + Clear() int Close() error Stat() (ReproviderStats, error) Provider diff --git a/provider/reprovider.go b/provider/reprovider.go index 100aee456..d8ed51296 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -399,6 +399,12 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } +// Clear removes all entries from the provide queue. Returns the number of CIDs +// removed from the queue. +func (s *reprovider) Clear() int { + return s.q.Clear() +} + func (s *reprovider) Close() error { s.close() err := s.q.Close()