diff --git a/go.mod b/go.mod index 3bf320d1..0397dae2 100644 --- a/go.mod +++ b/go.mod @@ -20,3 +20,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect golang.org/x/sys v0.13.0 // indirect ) + +// Use bleve_index_api branch with VFS support +replace github.com/blevesearch/bleve_index_api => github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01 diff --git a/go.sum b/go.sum index 1555d8e7..91d4f84f 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,10 @@ github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2JW2gggRdg= github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0= +github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01 h1:SbGoS4vY5GDtDwxKy4iT+s0LsEBQMKd/FNRwTCnWssI= +github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/blevesearch/bleve_index_api v1.2.11 h1:bXQ54kVuwP8hdrXUSOnvTQfgK0KI1+f9A0ITJT8tX1s= -github.com/blevesearch/bleve_index_api v1.2.11/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= github.com/blevesearch/go-faiss v1.0.26 h1:4dRLolFgjPyjkaXwff4NfbZFdE/dfywbzDqporeQvXI= github.com/blevesearch/go-faiss v1.0.26/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= diff --git a/plugin.go b/plugin.go index f67297ec..86027e07 100644 --- a/plugin.go +++ b/plugin.go @@ -14,6 +14,18 @@ package zap +import ( + "bufio" + "fmt" + "os" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/blevesearch/bleve_index_api/vfs" + mmap "github.com/blevesearch/mmap-go" + segment "github.com/blevesearch/scorch_segment_api/v2" + "github.com/blevesearch/vellum" +) + // ZapPlugin implements the Plugin interface of // the blevesearch/scorch_segment_api pkg type ZapPlugin struct{} @@ -25,3 +37,142 @@ func (*ZapPlugin) Type() string { func (*ZapPlugin) Version() uint32 { return Version } + +// OpenVFS opens a segment file through the VFS directory abstraction. +// This implementation uses OpenAt() for direct memory-mapping without temporary files. +func (*ZapPlugin) OpenVFS(dir vfs.Directory, name string) (segment.Segment, error) { + // 1. Open file for random access from VFS + rac, err := dir.OpenAt(name) + if err != nil { + return nil, fmt.Errorf("VFS openAt %s: %w", name, err) + } + + // 2. Get file descriptor for mmap + fd := rac.AsFd() + if fd == 0 { + rac.Close() + return nil, fmt.Errorf("VFS does not provide file descriptor for mmap (not file-backed)") + } + + // 3. Get the underlying *os.File for mmap + // FileReaderAtCloser embeds *os.File, so we can type assert to access it + var f *os.File + if frac, ok := rac.(*vfs.FileReaderAtCloser); ok { + f = frac.File + } else { + // Fallback: not a FileReaderAtCloser, can't get *os.File for mmap + rac.Close() + return nil, fmt.Errorf("VFS ReaderAtCloser is not file-backed, cannot mmap") + } + + // 4. Memory-map the file + // First check file size to ensure it's valid for mmap + fileInfo, err := f.Stat() + if err != nil { + rac.Close() + return nil, fmt.Errorf("failed to stat file: %w", err) + } + if fileInfo.Size() == 0 { + rac.Close() + return nil, fmt.Errorf("cannot mmap empty file") + } + + mm, err := mmap.Map(f, mmap.RDONLY, 0) + if err != nil { + rac.Close() + return nil, fmt.Errorf("mmap failed (file size: %d): %w", fileInfo.Size(), err) + } + + // 5. Create segment with mmap + rv := &Segment{ + SegmentBase: SegmentBase{ + fieldsMap: make(map[string]uint16), + fieldFSTs: make(map[uint16]*vellum.FST), + vecIndexCache: newVectorIndexCache(), + synIndexCache: newSynonymIndexCache(), + fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), + }, + f: f, + mm: mm, + path: name, // Use VFS name instead of filesystem path + refs: 1, + } + rv.SegmentBase.updateSize() + + // 6. Load segment metadata + if err := rv.loadConfig(); err != nil { + rv.Close() + return nil, err + } + + if err := rv.loadFieldsNew(); err != nil { + rv.Close() + return nil, err + } + + if err := rv.loadDvReaders(); err != nil { + rv.Close() + return nil, err + } + + return rv, nil +} + +// MergeVFS merges segments and writes the result through the VFS directory abstraction. +// This implementation writes directly to the VFS writer, avoiding temporary files. +func (*ZapPlugin) MergeVFS(dir vfs.Directory, name string, + segments []segment.Segment, drops []*roaring.Bitmap, + closeCh chan struct{}, s segment.StatsReporter) ([][]uint64, uint64, error) { + + // 1. Extract SegmentBases from segments + segmentBases := make([]*SegmentBase, len(segments)) + for segmenti, seg := range segments { + switch segmentx := seg.(type) { + case *Segment: + segmentBases[segmenti] = &segmentx.SegmentBase + case *SegmentBase: + segmentBases[segmenti] = segmentx + default: + return nil, 0, fmt.Errorf("unexpected segment type: %T", seg) + } + } + + // 2. Create VFS writer for output + w, err := dir.Create(name) + if err != nil { + return nil, 0, fmt.Errorf("VFS create %s: %w", name, err) + } + defer w.Close() + + // 3. Buffer the VFS output for performance + br := bufio.NewWriterSize(w, DefaultFileMergerBufferSize) + + // 4. Wrap writer for counting and stats + cr := NewCountHashWriterWithStatsReporter(br, s) + + // 5. Perform merge to writer (existing logic) + newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err := + mergeToWriter(segmentBases, drops, DefaultChunkMode, cr, closeCh) + if err != nil { + return nil, 0, fmt.Errorf("merge to writer: %w", err) + } + + // 6. Write footer + err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, sectionsIndexOffset, + 0, DefaultChunkMode, cr.Sum32(), cr) + if err != nil { + return nil, 0, fmt.Errorf("persist footer: %w", err) + } + + // 7. Flush buffer + if err := br.Flush(); err != nil { + return nil, 0, fmt.Errorf("flush buffer: %w", err) + } + + // 8. Sync to VFS for durability + if err := w.Sync(); err != nil { + return nil, 0, fmt.Errorf("VFS sync: %w", err) + } + + return newDocNums, uint64(cr.Count()), nil +}