Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.13.0 // indirect
)

// Use bleve_index_api branch with VFS support
replace github.com/blevesearch/bleve_index_api => github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2JW2gggRdg=
github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01 h1:SbGoS4vY5GDtDwxKy4iT+s0LsEBQMKd/FNRwTCnWssI=
github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4=
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blevesearch/bleve_index_api v1.2.11 h1:bXQ54kVuwP8hdrXUSOnvTQfgK0KI1+f9A0ITJT8tX1s=
github.com/blevesearch/bleve_index_api v1.2.11/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/blevesearch/go-faiss v1.0.26 h1:4dRLolFgjPyjkaXwff4NfbZFdE/dfywbzDqporeQvXI=
github.com/blevesearch/go-faiss v1.0.26/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
Expand Down
151 changes: 151 additions & 0 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@

package zap

import (
"bufio"
"fmt"
"os"

"github.com/RoaringBitmap/roaring/v2"
"github.com/blevesearch/bleve_index_api/vfs"
mmap "github.com/blevesearch/mmap-go"
segment "github.com/blevesearch/scorch_segment_api/v2"
"github.com/blevesearch/vellum"
)

// ZapPlugin implements the Plugin interface of
// the blevesearch/scorch_segment_api pkg
type ZapPlugin struct{}
Expand All @@ -25,3 +37,142 @@ func (*ZapPlugin) Type() string {
func (*ZapPlugin) Version() uint32 {
return Version
}

// OpenVFS opens a segment file through the VFS directory abstraction.
// This implementation uses OpenAt() for direct memory-mapping without temporary files.
func (*ZapPlugin) OpenVFS(dir vfs.Directory, name string) (segment.Segment, error) {
// 1. Open file for random access from VFS
rac, err := dir.OpenAt(name)
if err != nil {
return nil, fmt.Errorf("VFS openAt %s: %w", name, err)
}

// 2. Get file descriptor for mmap
fd := rac.AsFd()
if fd == 0 {
rac.Close()
return nil, fmt.Errorf("VFS does not provide file descriptor for mmap (not file-backed)")
}

// 3. Get the underlying *os.File for mmap
// FileReaderAtCloser embeds *os.File, so we can type assert to access it
var f *os.File
if frac, ok := rac.(*vfs.FileReaderAtCloser); ok {
f = frac.File
} else {
// Fallback: not a FileReaderAtCloser, can't get *os.File for mmap
rac.Close()
return nil, fmt.Errorf("VFS ReaderAtCloser is not file-backed, cannot mmap")
}

// 4. Memory-map the file
// First check file size to ensure it's valid for mmap
fileInfo, err := f.Stat()
if err != nil {
rac.Close()
return nil, fmt.Errorf("failed to stat file: %w", err)
}
if fileInfo.Size() == 0 {
rac.Close()
return nil, fmt.Errorf("cannot mmap empty file")
}

mm, err := mmap.Map(f, mmap.RDONLY, 0)
if err != nil {
rac.Close()
return nil, fmt.Errorf("mmap failed (file size: %d): %w", fileInfo.Size(), err)
}

// 5. Create segment with mmap
rv := &Segment{
SegmentBase: SegmentBase{
fieldsMap: make(map[string]uint16),
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
synIndexCache: newSynonymIndexCache(),
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
},
f: f,
mm: mm,
path: name, // Use VFS name instead of filesystem path
refs: 1,
}
rv.SegmentBase.updateSize()

// 6. Load segment metadata
if err := rv.loadConfig(); err != nil {
rv.Close()
return nil, err
}

if err := rv.loadFieldsNew(); err != nil {
rv.Close()
return nil, err
}

if err := rv.loadDvReaders(); err != nil {
rv.Close()
return nil, err
}

return rv, nil
}

// MergeVFS merges segments and writes the result through the VFS directory abstraction.
// This implementation writes directly to the VFS writer, avoiding temporary files.
func (*ZapPlugin) MergeVFS(dir vfs.Directory, name string,
segments []segment.Segment, drops []*roaring.Bitmap,
closeCh chan struct{}, s segment.StatsReporter) ([][]uint64, uint64, error) {

// 1. Extract SegmentBases from segments
segmentBases := make([]*SegmentBase, len(segments))
for segmenti, seg := range segments {
switch segmentx := seg.(type) {
case *Segment:
segmentBases[segmenti] = &segmentx.SegmentBase
case *SegmentBase:
segmentBases[segmenti] = segmentx
default:
return nil, 0, fmt.Errorf("unexpected segment type: %T", seg)
}
}

// 2. Create VFS writer for output
w, err := dir.Create(name)
if err != nil {
return nil, 0, fmt.Errorf("VFS create %s: %w", name, err)
}
defer w.Close()

// 3. Buffer the VFS output for performance
br := bufio.NewWriterSize(w, DefaultFileMergerBufferSize)

// 4. Wrap writer for counting and stats
cr := NewCountHashWriterWithStatsReporter(br, s)

// 5. Perform merge to writer (existing logic)
newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err :=
mergeToWriter(segmentBases, drops, DefaultChunkMode, cr, closeCh)
if err != nil {
return nil, 0, fmt.Errorf("merge to writer: %w", err)
}

// 6. Write footer
err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, sectionsIndexOffset,
0, DefaultChunkMode, cr.Sum32(), cr)
if err != nil {
return nil, 0, fmt.Errorf("persist footer: %w", err)
}

// 7. Flush buffer
if err := br.Flush(); err != nil {
return nil, 0, fmt.Errorf("flush buffer: %w", err)
}

// 8. Sync to VFS for durability
if err := w.Sync(); err != nil {
return nil, 0, fmt.Errorf("VFS sync: %w", err)
}

return newDocNums, uint64(cr.Count()), nil
}