Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/couchbase/moss v0.2.0
github.com/spf13/cobra v1.8.1
go.etcd.io/bbolt v1.4.0
golang.org/x/sys v0.29.0
golang.org/x/text v0.8.0
google.golang.org/protobuf v1.36.6
)
Expand All @@ -42,5 +43,10 @@ require (
github.com/json-iterator/go v0.0.0-20171115153421-f7279a603ede // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
golang.org/x/sys v0.29.0 // indirect
)

// Use bleve_index_api branch with VFS support
replace github.com/blevesearch/bleve_index_api => github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01

// Use VFS-enabled zapx from ajroetker fork
replace github.com/blevesearch/zapx/v16 => github.com/ajroetker/zapx/v16 v16.0.0-20251111234330-70822381ed85
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2JW2gggRdg=
github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01 h1:SbGoS4vY5GDtDwxKy4iT+s0LsEBQMKd/FNRwTCnWssI=
github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/ajroetker/zapx/v16 v16.0.0-20251111234330-70822381ed85 h1:pY9/pLIPBX6JMYI9DzultU8yybMBVaqzN8ceU/dQurQ=
github.com/ajroetker/zapx/v16 v16.0.0-20251111234330-70822381ed85/go.mod h1:7y0yPdM9JLW29eRvgtmgaxujU4t0CUfzo1sFvP1lLss=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4=
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blevesearch/bleve_index_api v1.2.11 h1:bXQ54kVuwP8hdrXUSOnvTQfgK0KI1+f9A0ITJT8tX1s=
github.com/blevesearch/bleve_index_api v1.2.11/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/blevesearch/geo v0.2.4 h1:ECIGQhw+QALCZaDcogRTNSJYQXRtC8/m8IKiA706cqk=
github.com/blevesearch/geo v0.2.4/go.mod h1:K56Q33AzXt2YExVHGObtmRSFYZKYGv0JEN5mdacJJR8=
github.com/blevesearch/go-faiss v1.0.26 h1:4dRLolFgjPyjkaXwff4NfbZFdE/dfywbzDqporeQvXI=
Expand Down Expand Up @@ -44,8 +46,6 @@ github.com/blevesearch/zapx/v14 v14.4.2 h1:2SGHakVKd+TrtEqpfeq8X+So5PShQ5nW6GNxT
github.com/blevesearch/zapx/v14 v14.4.2/go.mod h1:rz0XNb/OZSMjNorufDGSpFpjoFKhXmppH9Hi7a877D8=
github.com/blevesearch/zapx/v15 v15.4.2 h1:sWxpDE0QQOTjyxYbAVjt3+0ieu8NCE0fDRaFxEsp31k=
github.com/blevesearch/zapx/v15 v15.4.2/go.mod h1:1pssev/59FsuWcgSnTa0OeEpOzmhtmr/0/11H0Z8+Nw=
github.com/blevesearch/zapx/v16 v16.2.7 h1:xcgFRa7f/tQXOwApVq7JWgPYSlzyUMmkuYa54tMDuR0=
github.com/blevesearch/zapx/v16 v16.2.7/go.mod h1:murSoCJPCk25MqURrcJaBQ1RekuqSCSfMjXH4rHyA14=
github.com/couchbase/ghistogram v0.1.0 h1:b95QcQTCzjTUocDXp/uMgSNQi8oj1tGwnJ4bODWZnps=
github.com/couchbase/ghistogram v0.1.0/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k=
github.com/couchbase/moss v0.2.0 h1:VCYrMzFwEryyhRSeI+/b3tRBSeTpi/8gn5Kf6dxqn+o=
Expand Down
64 changes: 56 additions & 8 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
go cw.listen()

for _, task := range resultMergePlan.Tasks {
// Check if context was cancelled before starting next task
select {
case <-cw.cancelCh:
return segment.ErrClosed
default:
}

if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
continue
Expand Down Expand Up @@ -354,14 +361,26 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
if len(segmentsToMerge) > 0 {
filename = zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename

fileMergeZapStartTime := time.Now()

atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
prevBytesReadTotal := cumulateBytesRead(segmentsToMerge)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s)

// Try VFS-aware plugin first, fall back to legacy path-based
var newDocNums [][]uint64
var err error
if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil {
// Use VFS-aware merge with relative filename
newDocNums, _, err = vfsPlugin.MergeVFS(s.vfsDir, filename,
segmentsToMerge, docsToDrop, cw.cancelCh, s)
} else {
// Legacy path-based merge
path := s.path + string(os.PathSeparator) + filename
newDocNums, _, err = s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s)
}

atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)

fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
Expand All @@ -379,7 +398,15 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
return fmt.Errorf("merging failed: %v", err)
}

seg, err = s.segPlugin.Open(path)
// Open the newly merged segment
if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil {
// Use VFS-aware open with relative filename
seg, err = vfsPlugin.OpenVFS(s.vfsDir, filename)
} else {
// Legacy path-based open
path := s.path + string(os.PathSeparator) + filename
seg, err = s.segPlugin.Open(path)
}
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
Expand Down Expand Up @@ -523,27 +550,48 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot,
defer wg.Done()
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
filename := zapFileName(newSegmentID)
path := s.path + string(os.PathSeparator) + filename

// the newly merged segment is already flushed out to disk, just needs
// to be opened using mmap.
newDocIDs, _, err :=
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
var newDocIDs [][]uint64
var err error

// Try VFS-aware plugin first, fall back to legacy path-based
if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil {
// Use VFS-aware merge with relative filename
newDocIDs, _, err = vfsPlugin.MergeVFS(s.vfsDir, filename,
segsBatch, dropsBatch, s.closeCh, s)
} else {
// Legacy path-based merge
path := s.path + string(os.PathSeparator) + filename
newDocIDs, _, err = s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
}

if err != nil {
em.Lock()
errs = append(errs, err)
em.Unlock()
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return
}

// to prevent accidental cleanup of this newly created file, mark it
// as ineligible for removal. this will be flipped back when the bolt
// is updated - which is valid, since the snapshot updated in bolt is
// cleaned up only if its zero ref'd (MB-66163 for more details)
s.markIneligibleForRemoval(filename)
newMergedSegmentIDs[id] = newSegmentID
newDocIDsSet[id] = newDocIDs
newMergedSegments[id], err = s.segPlugin.Open(path)

// Open the newly merged segment
if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil {
// Use VFS-aware open with relative filename
newMergedSegments[id], err = vfsPlugin.OpenVFS(s.vfsDir, filename)
} else {
// Legacy path-based open
path := s.path + string(os.PathSeparator) + filename
newMergedSegments[id], err = s.segPlugin.Open(path)
}
if err != nil {
em.Lock()
errs = append(errs, err)
Expand Down
Loading