From b28b519d225252b947d6bbb13689a1fc785e0ed1 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Tue, 11 Nov 2025 23:50:12 +0530 Subject: [PATCH 01/23] Introducing Zapx V17 --- build.go | 12 +-- inverted_text_cache.go | 105 +++++++++++++++++++++ segment.go | 208 ++++++++++------------------------------- synonym_cache.go | 18 ++-- thesaurus.go | 2 + 5 files changed, 171 insertions(+), 174 deletions(-) create mode 100644 inverted_text_cache.go diff --git a/build.go b/build.go index 7843653a..19476e9b 100644 --- a/build.go +++ b/build.go @@ -22,11 +22,10 @@ import ( "os" index "github.com/blevesearch/bleve_index_api" - "github.com/blevesearch/vellum" ) -const Version uint32 = 16 -const IndexSectionsVersion uint32 = 16 +const Version uint32 = 17 + const Type string = "zap" const fieldNotUninverted = math.MaxUint64 @@ -171,19 +170,18 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), docValueOffset: 0, // docValueOffsets identified automatically by the section updatedFields: make(map[string]*index.UpdateFieldInfo), - fieldFSTs: make(map[uint16]*vellum.FST), + invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), - // following fields gets populated by loadFieldsNew + // following fields gets populated by loadFields fieldsMap: make(map[string]uint16), - dictLocs: make([]uint64, 0), fieldsInv: make([]string, 0), } sb.updateSize() // load the data/section starting offsets for each field // by via the sectionsIndexOffset as starting point. - err := sb.loadFieldsNew() + err := sb.loadFields() if err != nil { return nil, err } diff --git a/inverted_text_cache.go b/inverted_text_cache.go new file mode 100644 index 00000000..c77002ef --- /dev/null +++ b/inverted_text_cache.go @@ -0,0 +1,105 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "encoding/binary" + "fmt" + "sync" + + "github.com/blevesearch/vellum" +) + +func newInvertedIndexCache() *invertedIndexCache { + return &invertedIndexCache{ + cache: make(map[uint16]*invertedCacheEntry), + } +} + +type invertedIndexCache struct { + m sync.RWMutex + + cache map[uint16]*invertedCacheEntry +} + +// Clear clears the synonym cache which would mean tha the termID to term map would no longer be available. +func (sc *invertedIndexCache) Clear() { + sc.m.Lock() + sc.cache = nil + sc.m.Unlock() +} + +// loadOrCreate loads the inverted index cache for the specified fieldID if it is already present, +// or creates it if not. The inverted index cache for a fieldID consists of an FST (Finite State Transducer): +// - A Vellum FST (Finite State Transducer) representing the TermDictionary. +// This function returns the loaded or newly created FST, and the number of bytes read from the provided memory slice, +// if the cache was created. +func (sc *invertedIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, uint64, error) { + sc.m.RLock() + entry, ok := sc.cache[fieldID] + if ok { + sc.m.RUnlock() + return entry.load() + } + + sc.m.RUnlock() + + sc.m.Lock() + defer sc.m.Unlock() + + entry, ok = sc.cache[fieldID] + if ok { + return entry.load() + } + + return sc.createAndCacheLOCKED(fieldID, mem) +} + +// createAndCacheLOCKED creates the inverted index cache for the specified fieldID and caches it. +func (sc *invertedIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, uint64, error) { + var pos uint64 + vellumLen, read := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + if vellumLen == 0 || read <= 0 { + return nil, 0, fmt.Errorf("vellum length is 0") + } + pos += uint64(read) + fstBytes := mem[pos : pos+vellumLen] + fst, err := vellum.Load(fstBytes) + if err != nil { + return nil, 0, fmt.Errorf("vellum err: %v", err) + } + pos += vellumLen + sc.insertLOCKED(fieldID, fst) + return fst, pos, nil +} + +// insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. +func (sc *invertedIndexCache) insertLOCKED(fieldID uint16, fst *vellum.FST) { + _, ok := sc.cache[fieldID] + if !ok { + sc.cache[fieldID] = &invertedCacheEntry{ + fst: fst, + } + } +} + +// invertedCacheEntry is the vellum FST and is the value stored in the invertedIndexCache cache, for a given fieldID. +type invertedCacheEntry struct { + fst *vellum.FST +} + +func (ce *invertedCacheEntry) load() (*vellum.FST, uint64, error) { + return ce.fst, 0, nil +} diff --git a/segment.go b/segment.go index 461fdf5a..1459fb95 100644 --- a/segment.go +++ b/segment.go @@ -28,7 +28,6 @@ import ( index "github.com/blevesearch/bleve_index_api" mmap "github.com/blevesearch/mmap-go" segment "github.com/blevesearch/scorch_segment_api/v2" - "github.com/blevesearch/vellum" "github.com/golang/snappy" ) @@ -55,7 +54,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { rv := &Segment{ SegmentBase: SegmentBase{ fieldsMap: make(map[string]uint16), - fieldFSTs: make(map[uint16]*vellum.FST), + invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), @@ -73,7 +72,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { return nil, err } - err = rv.loadFieldsNew() + err = rv.loadFields() if err != nil { _ = rv.Close() return nil, err @@ -105,17 +104,14 @@ type SegmentBase struct { fieldsIndexOffset uint64 sectionsIndexOffset uint64 docValueOffset uint64 - dictLocs []uint64 fieldDvReaders []map[uint16]*docValueReader // naive chunk cache per field; section->field->reader fieldDvNames []string // field names cached in fieldDvReaders size uint64 updatedFields map[string]*index.UpdateFieldInfo - m sync.Mutex - fieldFSTs map[uint16]*vellum.FST - - // this cache comes into play when vectors are supported in builds. + // section-specific cache + invIndexCache *invertedIndexCache vecIndexCache *vectorIndexCache synIndexCache *synonymIndexCache } @@ -133,11 +129,10 @@ func (sb *SegmentBase) updateSize() { sizeInBytes += (len(k) + SizeOfString) + SizeOfUint16 } - // fieldsInv, dictLocs + // fieldsInv for _, entry := range sb.fieldsInv { sizeInBytes += len(entry) + SizeOfString } - sizeInBytes += len(sb.dictLocs) * SizeOfUint64 // fieldDvReaders for _, secDvReaders := range sb.fieldDvReaders { @@ -155,6 +150,7 @@ func (sb *SegmentBase) updateSize() { func (sb *SegmentBase) AddRef() {} func (sb *SegmentBase) DecRef() (err error) { return nil } func (sb *SegmentBase) Close() (err error) { + sb.invIndexCache.Clear() sb.vecIndexCache.Clear() sb.synIndexCache.Clear() return nil @@ -212,7 +208,7 @@ func (s *Segment) loadConfig() error { verOffset := crcOffset - 4 s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) - if Version < IndexSectionsVersion && s.version != Version { + if s.version != Version { return fmt.Errorf("unsupported version %d != %d", s.version, Version) } @@ -226,15 +222,9 @@ func (s *Segment) loadConfig() error { // determining the right footer size based on version, this becomes important // while loading the fields portion or the sections portion of the index file. - var footerSize int - if s.version >= IndexSectionsVersion { - // for version 16 and above, parse the sectionsIndexOffset - s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) - fieldsIndexOffset = fieldsIndexOffset - 8 - footerSize = FooterSize - } else { - footerSize = FooterSize - 8 - } + s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) + fieldsIndexOffset = fieldsIndexOffset - 8 + footerSize := FooterSize s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) @@ -287,48 +277,16 @@ func (sb *SegmentBase) BytesRead() uint64 { func (sb *SegmentBase) ResetBytesRead(val uint64) {} func (sb *SegmentBase) incrementBytesRead(val uint64) { + fmt.Println("oldValue - bytesRead - ", sb.bytesRead) atomic.AddUint64(&sb.bytesRead, val) + fmt.Println("newValue - bytesRead - ", sb.bytesRead) } func (sb *SegmentBase) loadFields() error { - // NOTE for now we assume the fields index immediately precedes - // the footer, and if this changes, need to adjust accordingly (or - // store explicit length), where s.mem was sliced from s.mm in Open(). - fieldsIndexEnd := uint64(len(sb.mem)) - - // iterate through fields index - var fieldID uint64 - for sb.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd { - addr := binary.BigEndian.Uint64(sb.mem[sb.fieldsIndexOffset+(8*fieldID) : sb.fieldsIndexOffset+(8*fieldID)+8]) - - // accounting the address of the dictLoc being read from file - sb.incrementBytesRead(8) - - dictLoc, read := binary.Uvarint(sb.mem[addr:fieldsIndexEnd]) - n := uint64(read) - sb.dictLocs = append(sb.dictLocs, dictLoc) - - var nameLen uint64 - nameLen, read = binary.Uvarint(sb.mem[addr+n : fieldsIndexEnd]) - n += uint64(read) - - name := string(sb.mem[addr+n : addr+n+nameLen]) - - sb.incrementBytesRead(n + nameLen) - sb.fieldsInv = append(sb.fieldsInv, name) - sb.fieldsMap[name] = uint16(fieldID + 1) - - fieldID++ - } - return nil -} - -func (sb *SegmentBase) loadFieldsNew() error { pos := sb.sectionsIndexOffset if pos == 0 { - // this is the case only for older file formats - return sb.loadFields() + return fmt.Errorf("no sections index present") } seek := pos + binary.MaxVarintLen64 @@ -360,7 +318,7 @@ func (sb *SegmentBase) loadFieldsNew() error { fieldSectionMap := make(map[uint16]uint64) - err := sb.loadFieldNew(uint16(fieldID), addr, fieldSectionMap) + err := sb.loadField(uint16(fieldID), addr, fieldSectionMap) if err != nil { return err } @@ -374,7 +332,7 @@ func (sb *SegmentBase) loadFieldsNew() error { return nil } -func (sb *SegmentBase) loadFieldNew(fieldID uint16, pos uint64, +func (sb *SegmentBase) loadField(fieldID uint16, pos uint64, fieldSectionMap map[uint16]uint64) error { if pos == 0 { // there is no indexing structure present for this field/section @@ -401,28 +359,6 @@ func (sb *SegmentBase) loadFieldNew(fieldID uint16, pos uint64, fieldSectionAddr := binary.BigEndian.Uint64(sb.mem[pos : pos+8]) pos += 8 fieldSectionMap[fieldSectionType] = fieldSectionAddr - if fieldSectionType == SectionInvertedTextIndex { - // for the fields which don't have the inverted index, the offset is - // 0 and during query time, because there is no valid dictionary we - // will just have follow a no-op path. - if fieldSectionAddr == 0 { - sb.dictLocs = append(sb.dictLocs, 0) - continue - } - - read := 0 - // skip the doc values - _, n := binary.Uvarint(sb.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - fieldSectionAddr += uint64(n) - read += n - _, n = binary.Uvarint(sb.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - fieldSectionAddr += uint64(n) - read += n - dictLoc, n := binary.Uvarint(sb.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - // account the bytes read while parsing the field's inverted index section - sb.incrementBytesRead(uint64(read + n)) - sb.dictLocs = append(sb.dictLocs, dictLoc) - } } // account the bytes read while parsing the sections field index. @@ -441,41 +377,33 @@ func (sb *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { fieldIDPlus1 := sb.fieldsMap[field] - if fieldIDPlus1 > 0 { + if fieldIDPlus1 == 0 { + return nil, nil + } + pos := sb.fieldsSectionsMap[fieldIDPlus1-1][SectionInvertedTextIndex] + if pos > 0 { rv = &Dictionary{ sb: sb, field: field, fieldID: fieldIDPlus1 - 1, } - - dictStart := sb.dictLocs[rv.fieldID] - if dictStart > 0 { - var ok bool - sb.m.Lock() - if rv.fst, ok = sb.fieldFSTs[rv.fieldID]; !ok { - // read the length of the vellum data - vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64]) - if vellumLen == 0 { - sb.m.Unlock() - return nil, fmt.Errorf("empty dictionary for field: %v", field) - } - fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] - rv.incrementBytesRead(uint64(read) + vellumLen) - rv.fst, err = vellum.Load(fstBytes) - if err != nil { - sb.m.Unlock() - return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err) - } - - sb.fieldFSTs[rv.fieldID] = rv.fst - } - - sb.m.Unlock() - rv.fstReader, err = rv.fst.Reader() - if err != nil { - return nil, fmt.Errorf("dictionary field %s vellum reader err: %v", field, err) - } + // skip the doc value offsets to get to the dictionary portion + for i := 0; i < 2; i++ { + _, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + } + dictLoc, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + fst, bytesRead, err := sb.invIndexCache.loadOrCreate(rv.fieldID, sb.mem[dictLoc:]) + if err != nil { + return nil, fmt.Errorf("dictionary for field %s err: %v", field, err) } + rv.fst = fst + rv.fstReader, err = rv.fst.Reader() + if err != nil { + return nil, fmt.Errorf("dictionary nafor field %s, vellum reader err: %v", field, err) + } + rv.bytesRead += bytesRead } return rv, nil @@ -509,7 +437,7 @@ func (sb *SegmentBase) thesaurus(name string) (rv *Thesaurus, err error) { } thesLoc, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) - fst, synTermMap, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:]) + fst, synTermMap, bytesRead, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:]) if err != nil { return nil, fmt.Errorf("thesaurus name %s err: %v", name, err) } @@ -519,6 +447,7 @@ func (sb *SegmentBase) thesaurus(name string) (rv *Thesaurus, err error) { if err != nil { return nil, fmt.Errorf("thesaurus name %s vellum reader err: %v", name, err) } + rv.bytesRead += bytesRead } return rv, nil } @@ -700,6 +629,7 @@ func (s *Segment) Close() (err error) { func (s *Segment) closeActual() (err error) { // clear contents from the vector and synonym index cache before un-mmapping + s.invIndexCache.Clear() s.vecIndexCache.Clear() s.synIndexCache.Clear() @@ -765,10 +695,18 @@ func (s *Segment) NumDocs() uint64 { func (s *Segment) DictAddr(field string) (uint64, error) { fieldIDPlus1, ok := s.fieldsMap[field] if !ok { - return 0, fmt.Errorf("no such field '%s'", field) + return 0, fmt.Errorf("no dictionary for field '%s'", field) } - - return s.dictLocs[fieldIDPlus1-1], nil + dictStart := s.fieldsSectionsMap[fieldIDPlus1-1][SectionInvertedTextIndex] + if dictStart == 0 { + return 0, fmt.Errorf("no dictionary for field '%s'", field) + } + for i := 0; i < 2; i++ { + _, n := binary.Uvarint(s.mem[dictStart : dictStart+binary.MaxVarintLen64]) + dictStart += uint64(n) + } + dictLoc, _ := binary.Uvarint(s.mem[dictStart : dictStart+binary.MaxVarintLen64]) + return dictLoc, nil } // ThesaurusAddr is a helper function to compute the file offset where the @@ -842,48 +780,6 @@ func (s *Segment) loadDvReader(fieldID int, secID uint16) error { return nil } -func (s *Segment) loadDvReadersLegacy() error { - // older file formats to parse the docValueIndex and if that says doc values - // aren't there in this segment file, just return nil - if s.docValueOffset == fieldNotUninverted { - return nil - } - - for fieldID := range s.fieldsInv { - var read uint64 - start, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) - if n <= 0 { - return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %d", fieldID) - } - read += uint64(n) - end, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) - if n <= 0 { - return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID) - } - read += uint64(n) - s.incrementBytesRead(read) - - fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], start, end) - if err != nil { - return err - } - - if fieldDvReader != nil { - // older file formats have docValues corresponding only to inverted index - // ignore the rest. - if s.fieldDvReaders[SectionInvertedTextIndex] == nil { - s.fieldDvReaders[SectionInvertedTextIndex] = make(map[uint16]*docValueReader) - } - // fix the structure of fieldDvReaders - // currently it populates the inverted index doc values - s.fieldDvReaders[SectionInvertedTextIndex][uint16(fieldID)] = fieldDvReader - s.fieldDvNames = append(s.fieldDvNames, s.fieldsInv[fieldID]) - } - } - - return nil -} - // Segment is a file segment, and loading the dv readers from that segment // must account for the version while loading since the formats are different // in the older and the Version version. @@ -892,10 +788,6 @@ func (s *Segment) loadDvReaders() error { return nil } - if s.version < IndexSectionsVersion { - return s.loadDvReadersLegacy() - } - // for every section of every field, load the doc values and register // the readers. for fieldID := range s.fieldsInv { diff --git a/synonym_cache.go b/synonym_cache.go index 0b8d56c2..a4d24f92 100644 --- a/synonym_cache.go +++ b/synonym_cache.go @@ -46,7 +46,7 @@ func (sc *synonymIndexCache) Clear() { // - A Vellum FST (Finite State Transducer) representing the thesaurus. // - A map associating synonym IDs to their corresponding terms. // This function returns the loaded or newly created tuple (FST and map). -func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { +func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, uint64, error) { sc.m.RLock() entry, ok := sc.cache[fieldID] if ok { @@ -68,23 +68,23 @@ func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.F } // createAndCacheLOCKED creates the synonym index cache for the specified fieldID and caches it. -func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { +func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, uint64, error) { var pos uint64 vellumLen, read := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) if vellumLen == 0 || read <= 0 { - return nil, nil, fmt.Errorf("vellum length is 0") + return nil, nil, 0, fmt.Errorf("vellum length is 0") } pos += uint64(read) fstBytes := mem[pos : pos+vellumLen] fst, err := vellum.Load(fstBytes) if err != nil { - return nil, nil, fmt.Errorf("vellum err: %v", err) + return nil, nil, 0, fmt.Errorf("vellum err: %v", err) } pos += vellumLen numSyns, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) if numSyns == 0 { - return nil, nil, fmt.Errorf("no synonyms found") + return nil, nil, 0, fmt.Errorf("no synonyms found") } synTermMap := make(map[uint32][]byte, numSyns) for i := 0; i < int(numSyns); i++ { @@ -93,14 +93,14 @@ func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (* termLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) if termLen == 0 { - return nil, nil, fmt.Errorf("term length is 0") + return nil, nil, 0, fmt.Errorf("term length is 0") } term := mem[pos : pos+uint64(termLen)] pos += uint64(termLen) synTermMap[uint32(synID)] = term } sc.insertLOCKED(fieldID, fst, synTermMap) - return fst, synTermMap, nil + return fst, synTermMap, pos, nil } // insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. @@ -121,6 +121,6 @@ type synonymCacheEntry struct { synTermMap map[uint32][]byte } -func (ce *synonymCacheEntry) load() (*vellum.FST, map[uint32][]byte, error) { - return ce.fst, ce.synTermMap, nil +func (ce *synonymCacheEntry) load() (*vellum.FST, map[uint32][]byte, uint64, error) { + return ce.fst, ce.synTermMap, 0, nil } diff --git a/thesaurus.go b/thesaurus.go index 34a43629..f97aaf29 100644 --- a/thesaurus.go +++ b/thesaurus.go @@ -33,6 +33,8 @@ type Thesaurus struct { fst *vellum.FST fstReader *vellum.Reader + + bytesRead uint64 } // represents an immutable, empty Thesaurus From d974de5db64985f5a8bfc93b1985cd58c3758b13 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Tue, 11 Nov 2025 23:57:33 +0530 Subject: [PATCH 02/23] remove debug print statements --- segment.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/segment.go b/segment.go index 1459fb95..9e5fe146 100644 --- a/segment.go +++ b/segment.go @@ -277,9 +277,7 @@ func (sb *SegmentBase) BytesRead() uint64 { func (sb *SegmentBase) ResetBytesRead(val uint64) {} func (sb *SegmentBase) incrementBytesRead(val uint64) { - fmt.Println("oldValue - bytesRead - ", sb.bytesRead) atomic.AddUint64(&sb.bytesRead, val) - fmt.Println("newValue - bytesRead - ", sb.bytesRead) } func (sb *SegmentBase) loadFields() error { From 97898ccfc74f13595582409887c731960d330d4a Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 00:02:27 +0530 Subject: [PATCH 03/23] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- inverted_text_cache.go | 4 ++-- segment.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/inverted_text_cache.go b/inverted_text_cache.go index c77002ef..01cef2e1 100644 --- a/inverted_text_cache.go +++ b/inverted_text_cache.go @@ -34,7 +34,7 @@ type invertedIndexCache struct { cache map[uint16]*invertedCacheEntry } -// Clear clears the synonym cache which would mean tha the termID to term map would no longer be available. +// Clear clears the synonym cache which would mean that the termID to term map would no longer be available. func (sc *invertedIndexCache) Clear() { sc.m.Lock() sc.cache = nil @@ -85,7 +85,7 @@ func (sc *invertedIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) ( return fst, pos, nil } -// insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. +// insertLOCKED inserts the vellum FST into the cache for the specified fieldID. func (sc *invertedIndexCache) insertLOCKED(fieldID uint16, fst *vellum.FST) { _, ok := sc.cache[fieldID] if !ok { diff --git a/segment.go b/segment.go index 9e5fe146..9952ffcf 100644 --- a/segment.go +++ b/segment.go @@ -399,7 +399,7 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { rv.fst = fst rv.fstReader, err = rv.fst.Reader() if err != nil { - return nil, fmt.Errorf("dictionary nafor field %s, vellum reader err: %v", field, err) + return nil, fmt.Errorf("dictionary for field %s, vellum reader err: %v", field, err) } rv.bytesRead += bytesRead } From 583d6dcba82fceb45e08259ea7bb63888dbbbb70 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 00:07:46 +0530 Subject: [PATCH 04/23] small error msg fix --- segment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/segment.go b/segment.go index 9952ffcf..d962b7d0 100644 --- a/segment.go +++ b/segment.go @@ -693,7 +693,7 @@ func (s *Segment) NumDocs() uint64 { func (s *Segment) DictAddr(field string) (uint64, error) { fieldIDPlus1, ok := s.fieldsMap[field] if !ok { - return 0, fmt.Errorf("no dictionary for field '%s'", field) + return 0, fmt.Errorf("no such field '%s'", field) } dictStart := s.fieldsSectionsMap[fieldIDPlus1-1][SectionInvertedTextIndex] if dictStart == 0 { From 187f4e91e11b9081d06414a75dae549318d881f0 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 00:24:35 +0530 Subject: [PATCH 05/23] remove redundant docvalue reader code --- segment.go | 78 ------------------------------------------------------ 1 file changed, 78 deletions(-) diff --git a/segment.go b/segment.go index d962b7d0..5f3dcf99 100644 --- a/segment.go +++ b/segment.go @@ -726,91 +726,13 @@ func (s *Segment) ThesaurusAddr(name string) (uint64, error) { return thesLoc, nil } -func (s *Segment) getSectionDvOffsets(fieldID int, secID uint16) (uint64, uint64, uint64, error) { - // Version is gonna be 16 - var fieldLocStart uint64 = fieldNotUninverted - fieldLocEnd := fieldLocStart - sectionMap := s.fieldsSectionsMap[fieldID] - fieldAddrStart := sectionMap[secID] - n := 0 - - if fieldAddrStart > 0 { - // fixed encoding as of now, need to uvarint this - var read uint64 - fieldLocStart, n = binary.Uvarint(s.mem[fieldAddrStart+read : fieldAddrStart+read+binary.MaxVarintLen64]) - if n <= 0 { - return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %d", fieldID) - } - read += uint64(n) - - fieldLocEnd, n = binary.Uvarint(s.mem[fieldAddrStart+read : fieldAddrStart+read+binary.MaxVarintLen64]) - if n <= 0 { - return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID) - } - read += uint64(n) - - s.incrementBytesRead(read) - } - - return fieldLocStart, fieldLocEnd, 0, nil -} - -func (s *Segment) loadDvReader(fieldID int, secID uint16) error { - start, end, _, err := s.getSectionDvOffsets(fieldID, secID) - if err != nil { - return err - } - - fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], start, end) - if err != nil { - return err - } - - if fieldDvReader != nil { - if s.fieldDvReaders[secID] == nil { - s.fieldDvReaders[secID] = make(map[uint16]*docValueReader) - } - // fix the structure of fieldDvReaders - // currently it populates the inverted index doc values - s.fieldDvReaders[secID][uint16(fieldID)] = fieldDvReader - s.fieldDvNames = append(s.fieldDvNames, s.fieldsInv[fieldID]) - } - return nil -} - -// Segment is a file segment, and loading the dv readers from that segment -// must account for the version while loading since the formats are different -// in the older and the Version version. -func (s *Segment) loadDvReaders() error { - if s.numDocs == 0 { - return nil - } - - // for every section of every field, load the doc values and register - // the readers. - for fieldID := range s.fieldsInv { - for secID := range segmentSections { - s.loadDvReader(fieldID, secID) - } - } - - return nil -} - -// since segmentBase is an in-memory segment, it can be called only -// for v16 file formats as part of InitSegmentBase() while introducing -// a segment into the system. func (sb *SegmentBase) loadDvReaders() error { - - // evaluate -> s.docValueOffset == fieldNotUninverted if sb.numDocs == 0 { return nil } - for fieldID, sections := range sb.fieldsSectionsMap { for secID, secOffset := range sections { if secOffset > 0 { - // fixed encoding as of now, need to uvarint this pos := secOffset var read uint64 fieldLocStart, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) From 2a205bbc3853a1dddf6cbc45de646a43adc189d5 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 01:34:11 +0530 Subject: [PATCH 06/23] fix footer --- build.go | 5 +--- cmd/zap/cmd/docvalue.go | 2 +- cmd/zap/cmd/fields.go | 2 +- cmd/zap/cmd/footer.go | 3 +-- cmd/zap/cmd/synonym.go | 2 +- cmd/zap/cmd/vector.go | 2 +- merge.go | 4 +-- segment.go | 54 ++++++++++++++++++----------------------- write.go | 24 ++++++------------ zap.md | 34 ++++++++++++-------------- 10 files changed, 54 insertions(+), 78 deletions(-) diff --git a/build.go b/build.go index 19476e9b..ee3c3b09 100644 --- a/build.go +++ b/build.go @@ -98,8 +98,7 @@ func persistSegmentBaseToWriter(sb *SegmentBase, w io.Writer) (int, error) { return 0, err } - err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.sectionsIndexOffset, - sb.docValueOffset, sb.chunkMode, sb.memCRC, br) + err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.sectionsIndexOffset, sb.chunkMode, sb.memCRC, br) if err != nil { return 0, err } @@ -165,10 +164,8 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 chunkMode: chunkMode, numDocs: numDocs, storedIndexOffset: storedIndexOffset, - fieldsIndexOffset: sectionsIndexOffset, sectionsIndexOffset: sectionsIndexOffset, fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), - docValueOffset: 0, // docValueOffsets identified automatically by the section updatedFields: make(map[string]*index.UpdateFieldInfo), invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), diff --git a/cmd/zap/cmd/docvalue.go b/cmd/zap/cmd/docvalue.go index 86cf427d..d28e35dd 100644 --- a/cmd/zap/cmd/docvalue.go +++ b/cmd/zap/cmd/docvalue.go @@ -193,7 +193,7 @@ func docValueCmd(cmd *cobra.Command, args []string) error { data := segment.Data() // iterate through fields index - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats return fmt.Errorf("file format not supported") diff --git a/cmd/zap/cmd/fields.go b/cmd/zap/cmd/fields.go index 1b0bb8b9..1eec7882 100644 --- a/cmd/zap/cmd/fields.go +++ b/cmd/zap/cmd/fields.go @@ -32,7 +32,7 @@ var fieldsCmd = &cobra.Command{ } data := segment.Data() - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats return fmt.Errorf("file format not supported") diff --git a/cmd/zap/cmd/footer.go b/cmd/zap/cmd/footer.go index 1423a6b6..07d8bbb8 100644 --- a/cmd/zap/cmd/footer.go +++ b/cmd/zap/cmd/footer.go @@ -31,9 +31,8 @@ var footerCmd = &cobra.Command{ fmt.Printf("CRC: %#x\n", segment.CRC()) fmt.Printf("Version: %d\n", segment.Version()) fmt.Printf("Chunk Mode: %d\n", segment.ChunkMode()) - fmt.Printf("Fields Idx: %d (%#x)\n", segment.FieldsIndexOffset(), segment.FieldsIndexOffset()) + fmt.Printf("Sections Idx: %d (%#x)\n", segment.SectionsIndexOffset(), segment.SectionsIndexOffset()) fmt.Printf("Stored Idx: %d (%#x)\n", segment.StoredIndexOffset(), segment.StoredIndexOffset()) - fmt.Printf("DocValue Idx: %d (%#x)\n", segment.DocValueOffset(), segment.DocValueOffset()) fmt.Printf("Num Docs: %d\n", segment.NumDocs()) return nil }, diff --git a/cmd/zap/cmd/synonym.go b/cmd/zap/cmd/synonym.go index 1b3f7101..721c4451 100644 --- a/cmd/zap/cmd/synonym.go +++ b/cmd/zap/cmd/synonym.go @@ -29,7 +29,7 @@ var thesaurusCmd = &cobra.Command{ Short: "thesaurus prints the thesaurus with the specified name", Long: `The thesaurus command lets you print the thesaurus with the specified name.`, RunE: func(cmd *cobra.Command, args []string) error { - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats return fmt.Errorf("file format not supported") diff --git a/cmd/zap/cmd/vector.go b/cmd/zap/cmd/vector.go index ffa4dabd..ce16b2e3 100644 --- a/cmd/zap/cmd/vector.go +++ b/cmd/zap/cmd/vector.go @@ -40,7 +40,7 @@ var vectorCmd = &cobra.Command{ 4. reconstruct vector given the vectorID.`, RunE: func(cmd *cobra.Command, args []string) error { data := segment.Data() - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats diff --git a/merge.go b/merge.go index 6197af11..e0fcaf4c 100644 --- a/merge.go +++ b/merge.go @@ -81,9 +81,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat return nil, 0, err } - // passing the sectionsIndexOffset as fieldsIndexOffset and the docValueOffset as 0 for the footer - err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, sectionsIndexOffset, - 0, chunkMode, cr.Sum32(), cr) + err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, chunkMode, cr.Sum32(), cr) if err != nil { cleanup() return nil, 0, err diff --git a/segment.go b/segment.go index 5f3dcf99..38e3b3a1 100644 --- a/segment.go +++ b/segment.go @@ -101,16 +101,15 @@ type SegmentBase struct { fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address numDocs uint64 storedIndexOffset uint64 - fieldsIndexOffset uint64 sectionsIndexOffset uint64 - docValueOffset uint64 fieldDvReaders []map[uint16]*docValueReader // naive chunk cache per field; section->field->reader fieldDvNames []string // field names cached in fieldDvReaders size uint64 + // index update specific tracking updatedFields map[string]*index.UpdateFieldInfo - // section-specific cache + // section-specific caches invIndexCache *invertedIndexCache vecIndexCache *vectorIndexCache synIndexCache *synonymIndexCache @@ -203,41 +202,39 @@ func (s *Segment) DecRef() (err error) { } func (s *Segment) loadConfig() error { + // read offsets of 32 bit values - crc, ver, chunk crcOffset := len(s.mm) - 4 + verOffset := crcOffset - 4 + chunkOffset := verOffset - 4 + + // read offsets of 64 bit values - sectionsIndexOffset, storedIndexOffset, numDocsOffset + sectionsIndexOffset := chunkOffset - 8 + storedIndexOffset := sectionsIndexOffset - 8 + numDocsOffset := storedIndexOffset - 8 + + // read 32-bit crc s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4]) - verOffset := crcOffset - 4 + // read 32-bit version s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) if s.version != Version { return fmt.Errorf("unsupported version %d != %d", s.version, Version) } - chunkOffset := verOffset - 4 + // read 32-bit chunk mode s.chunkMode = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4]) - docValueOffset := chunkOffset - 8 - s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8]) + // read 64-bit sections index offset + s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[sectionsIndexOffset : sectionsIndexOffset+8]) - fieldsIndexOffset := docValueOffset - 8 - - // determining the right footer size based on version, this becomes important - // while loading the fields portion or the sections portion of the index file. - s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) - fieldsIndexOffset = fieldsIndexOffset - 8 - footerSize := FooterSize - - s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) - - storedIndexOffset := fieldsIndexOffset - 8 + // read 64-bit stored index offset s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8]) - numDocsOffset := storedIndexOffset - 8 + // read 64-bit num docs s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8]) - // 8*4 + 4*3 = 44 bytes being accounted from all the offsets - // above being read from the file - s.incrementBytesRead(uint64(footerSize)) - s.SegmentBase.mem = s.mm[:len(s.mm)-footerSize] + s.incrementBytesRead(uint64(FooterSize)) + s.SegmentBase.mem = s.mm[:len(s.mm)-FooterSize] return nil } @@ -668,9 +665,9 @@ func (s *Segment) ChunkMode() uint32 { return s.chunkMode } -// FieldsIndexOffset returns the fields index offset in the file footer -func (s *Segment) FieldsIndexOffset() uint64 { - return s.fieldsIndexOffset +// SectionsIndexOffset returns the sections index offset in the file footer +func (s *Segment) SectionsIndexOffset() uint64 { + return s.sectionsIndexOffset } // StoredIndexOffset returns the stored value index offset in the file footer @@ -678,11 +675,6 @@ func (s *Segment) StoredIndexOffset() uint64 { return s.storedIndexOffset } -// DocValueOffset returns the docValue offset in the file footer -func (s *Segment) DocValueOffset() uint64 { - return s.docValueOffset -} - // NumDocs returns the number of documents in the file footer func (s *Segment) NumDocs() uint64 { return s.numDocs diff --git a/write.go b/write.go index 4e7f5523..39382d05 100644 --- a/write.go +++ b/write.go @@ -104,12 +104,11 @@ func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int } // FooterSize is the size of the footer record in bytes -// crc + ver + chunk + docValueOffset + sectionsIndexOffset + field offset + stored offset + num docs -const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8 + 8 +// crc + ver + chunk + sectionsIndexOffset + stored offset + num docs +const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 -// in the index sections format, the fieldsIndexOffset points to the sectionsIndexOffset -func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, sectionsIndexOffset, docValueOffset uint64, - chunkMode uint32, crcBeforeFooter uint32, writerIn io.Writer) error { +func persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset uint64, + chunkMode, crcBeforeFooter uint32, writerIn io.Writer) error { w := NewCountHashWriter(writerIn) w.crc = crcBeforeFooter @@ -118,38 +117,31 @@ func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, sectionsIndexO if err != nil { return err } + // write out the stored field index location: err = binary.Write(w, binary.BigEndian, storedIndexOffset) if err != nil { return err } - // write out the field index location - err = binary.Write(w, binary.BigEndian, fieldsIndexOffset) - if err != nil { - return err - } - // write out the new field index location (to be removed later, as this can eventually replace the old) + // write out the sections index location err = binary.Write(w, binary.BigEndian, sectionsIndexOffset) if err != nil { return err } - // write out the fieldDocValue location - err = binary.Write(w, binary.BigEndian, docValueOffset) - if err != nil { - return err - } // write out 32-bit chunk factor err = binary.Write(w, binary.BigEndian, chunkMode) if err != nil { return err } + // write out 32-bit version err = binary.Write(w, binary.BigEndian, Version) if err != nil { return err } + // write out CRC-32 of everything upto but not including this CRC err = binary.Write(w, binary.BigEndian, w.crc) if err != nil { diff --git a/zap.md b/zap.md index 125cadb1..c9dc529d 100644 --- a/zap.md +++ b/zap.md @@ -36,21 +36,21 @@ Footer section describes the configuration of particular ZAP file. The format of footer is version-dependent, so it is necessary to check `V` field before the parsing. - +==================================================+ - | Stored Fields | - |==================================================| - +-----> | Stored Fields Index | - | |==================================================| - | | Inverted Text Index Section | - | |==================================================| - | | Vector Index Section | - | |==================================================| - | | Sections Info | - | |==================================================| - | +-> | Sections Index | - | | |========+========+====+=====+======+====+====+====| - | | | D# | SF | F | S | FDV | CF | V | CC | (Footer) - | | +========+====+===+====+==+==+======+====+====+====+ + +===================================================+ + | Stored Fields | + |===================================================| + +-----> | Stored Fields Index | + | |===================================================| + | | Inverted Text Index Section | + | |===================================================| + | | Vector Index Section | + | |===================================================| + | | Sections Info | + | |===================================================| + | +-> | Sections Index | + | | |========+========+=======+========+=======+========| + | | | D# | SF | S | CF | V | CC | (Footer) + | | +========+========+=======+========+=======+========+ | | | | +---------------------+ | |-----------------------------+ @@ -58,9 +58,7 @@ Footer section describes the configuration of particular ZAP file. The format of D#. Number of Docs. SF. Stored Fields Index Offset. - F. Field Index Offset. S. Sections Index Offset - FDV. Field DocValue Offset. CF. Chunk Factor. V. Version. CC. CRC32. @@ -124,7 +122,7 @@ In case of inverted text index, the dictionary is encoded in [Vellum](https://gi | | | Freq/Norm (chunked) | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | - | +->[ Freq | Norm (float32 under varint) ] | + | +->[ Freq | Norm (float32 under varint) ] | | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | | | | | +------------------------------------------------------------+ | From 2e1faac2d498afc01dd5fe1fd4d3814a12232288 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 01:37:50 +0530 Subject: [PATCH 07/23] small fix in zap.md --- zap.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zap.md b/zap.md index c9dc529d..843a4ad0 100644 --- a/zap.md +++ b/zap.md @@ -122,7 +122,7 @@ In case of inverted text index, the dictionary is encoded in [Vellum](https://gi | | | Freq/Norm (chunked) | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | - | +->[ Freq | Norm (float32 under varint) ] | + | +->[ Freq | Norm (float32 under varint) ] | | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | | | | | +------------------------------------------------------------+ | From 6b3309d1a3a227b82c57e0a373c76a5c240c6898 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 14:38:58 +0530 Subject: [PATCH 08/23] cleanup --- section_faiss_vector_index.go | 1 + section_inverted_text_index.go | 60 ++++++++++++++++++++++++---------- section_synonym_index.go | 12 ++++--- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index ce16756c..7928a8a0 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -773,6 +773,7 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.vecFieldMap = nil v.vecIDMap = nil v.tmp0 = v.tmp0[:0] + v.updatedFields = nil atomic.StoreUint64(&v.bytesWritten, 0) diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 0862b2e2..98316404 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -747,29 +747,48 @@ func (io *invertedIndexOpaque) process(field index.Field, fieldID uint16, docNum } } -func (i *invertedIndexOpaque) realloc() { - var pidNext int +func (i *invertedIndexOpaque) initDictsAndKeysFromFields() { + numFields := len(i.FieldsInv) - var totTFs int - var totLocs int - i.FieldsMap = map[string]uint16{} - - i.getOrDefineField("_id") // _id field is fieldID 0 + // Resize or allocate Dicts + if cap(i.Dicts) >= numFields { + i.Dicts = i.Dicts[:numFields] + } else { + i.Dicts = make([]map[string]uint64, numFields) + } - for _, result := range i.results { - result.VisitComposite(func(field index.CompositeField) { - i.getOrDefineField(field.Name()) - }) - result.VisitFields(func(field index.Field) { - i.getOrDefineField(field.Name()) - }) + // Resize or allocate DictKeys + if cap(i.DictKeys) >= numFields { + i.DictKeys = i.DictKeys[:numFields] + } else { + i.DictKeys = make([][]string, numFields) } - sort.Strings(i.FieldsInv[1:]) // keep _id as first field + for idx := 0; idx < numFields; idx++ { + // --- Dicts --- + if i.Dicts[idx] == nil { + i.Dicts[idx] = make(map[string]uint64) + } else { + clear(i.Dicts[idx]) + } - for fieldID, fieldName := range i.FieldsInv { - i.FieldsMap[fieldName] = uint16(fieldID + 1) + // --- DictKeys --- + if i.DictKeys[idx] != nil { + i.DictKeys[idx] = i.DictKeys[idx][:0] + } else { + i.DictKeys[idx] = nil + } } +} + +func (i *invertedIndexOpaque) realloc() { + var pidNext int + + var totTFs int + var totLocs int + + // initialize dicts and dict keys from fieldsMap + i.initDictsAndKeysFromFields() visitField := func(field index.Field, docNum int) { fieldID := uint16(i.getOrDefineField(field.Name())) @@ -935,7 +954,7 @@ func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int { func (i *invertedTextIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &invertedIndexOpaque{ fieldAddrs: map[int]int{}, - updatedFields: make(map[string]*index.UpdateFieldInfo), + updatedFields: map[string]*index.UpdateFieldInfo{}, } for k, v := range args { rv.Set(k, v) @@ -1059,6 +1078,9 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.fieldsSame = false io.numDocs = 0 + clear(io.fieldAddrs) + clear(io.updatedFields) + return err } func (i *invertedIndexOpaque) Set(key string, val interface{}) { @@ -1071,6 +1093,8 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) { i.fieldsSame = val.(bool) case "fieldsMap": i.FieldsMap = val.(map[string]uint16) + case "fieldsInv": + i.FieldsInv = val.([]string) case "numDocs": i.numDocs = val.(uint64) case "updatedFields": diff --git a/section_synonym_index.go b/section_synonym_index.go index 3894d1ae..51df3f40 100644 --- a/section_synonym_index.go +++ b/section_synonym_index.go @@ -116,7 +116,8 @@ func (so *synonymIndexOpaque) Reset() (err error) { // cleanup stuff over here so.results = nil so.init = false - so.ThesaurusMap = nil + so.FieldsMap = nil + clear(so.ThesaurusMap) so.ThesaurusInv = so.ThesaurusInv[:0] for i := range so.Thesauri { so.Thesauri[i] = nil @@ -134,9 +135,10 @@ func (so *synonymIndexOpaque) Reset() (err error) { if so.builder != nil { err = so.builder.Reset(&so.builderBuf) } - so.FieldIDtoThesaurusID = nil + clear(so.FieldIDtoThesaurusID) so.SynonymTermToID = so.SynonymTermToID[:0] so.SynonymIDtoTerm = so.SynonymIDtoTerm[:0] + clear(so.thesaurusAddrs) so.tmp0 = so.tmp0[:0] return err @@ -176,8 +178,6 @@ func (so *synonymIndexOpaque) process(field index.SynonymField, fieldID uint16, func (so *synonymIndexOpaque) realloc() { var pidNext int var sidNext uint32 - so.ThesaurusMap = map[string]uint16{} - so.FieldIDtoThesaurusID = map[uint16]int{} // count the number of unique thesauri from the batch of documents for _, result := range so.results { @@ -398,7 +398,9 @@ func (s *synonymIndexSection) getSynonymIndexOpaque(opaque map[int]resetable) *s // results in the opaque before the section processes a synonym field. func (s *synonymIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &synonymIndexOpaque{ - thesaurusAddrs: map[int]int{}, + ThesaurusMap: map[string]uint16{}, + FieldIDtoThesaurusID: map[uint16]int{}, + thesaurusAddrs: map[int]int{}, } for k, v := range args { rv.Set(k, v) From d34dd949c6a418fdad1335adfb77ca6587f72222 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 15:52:51 +0530 Subject: [PATCH 09/23] Add fieldsOptions to manage indexing options for fields --- build.go | 5 +++-- merge.go | 25 +++++++++++++++++++++---- new.go | 27 +++++++++++++++++++++------ segment.go | 18 +++++++++++++++--- write.go | 10 +++++++++- 5 files changed, 69 insertions(+), 16 deletions(-) diff --git a/build.go b/build.go index ee3c3b09..1b30a732 100644 --- a/build.go +++ b/build.go @@ -171,8 +171,9 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), // following fields gets populated by loadFields - fieldsMap: make(map[string]uint16), - fieldsInv: make([]string, 0), + fieldsMap: make(map[string]uint16), + fieldsOptions: make(map[string]index.FieldIndexingOptions), + fieldsInv: make([]string, 0), } sb.updateSize() diff --git a/merge.go b/merge.go index e0fcaf4c..73210fed 100644 --- a/merge.go +++ b/merge.go @@ -121,6 +121,17 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf return fieldsInv[:idx] } +// Remove field options for fields that have been completely deleted +func filterFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, + fieldsMap map[string]uint16) map[string]index.FieldIndexingOptions { + for field := range fieldOptions { + if _, ok := fieldsMap[field]; !ok { + delete(fieldOptions, field) + } + } + return fieldOptions +} + func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( newDocNums [][]uint64, numDocs, storedIndexOffset uint64, @@ -128,10 +139,12 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, err error) { var fieldsSame bool - fieldsSame, fieldsInv = mergeFields(segments) + var fieldsOptions map[string]index.FieldIndexingOptions + fieldsSame, fieldsInv, fieldsOptions = mergeFields(segments) updatedFields := mergeUpdatedFields(segments) fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) + fieldsOptions = filterFieldOptions(fieldsOptions, fieldsMap) numDocs = computeNewDocCount(segments, drops) @@ -149,6 +162,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, "fieldsMap": fieldsMap, "numDocs": numDocs, "updatedFields": updatedFields, + "fieldsOptions": fieldsOptions, } if numDocs > 0 { @@ -171,7 +185,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, // we can persist the fields section index now, this will point // to the various indexes (each in different section) available for a field. - sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque) + sectionsIndexOffset, err = persistFieldsSection(fieldsInv, fieldsOptions, cr, mergeOpaque) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -597,7 +611,7 @@ func (sb *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint6 // input segments, and computes whether the fields are the same across // segments (which depends on fields to be sorted in the same way // across segments) -func mergeFields(segments []*SegmentBase) (bool, []string) { +func mergeFields(segments []*SegmentBase) (bool, []string, map[string]index.FieldIndexingOptions) { fieldsSame := true var segment0Fields []string @@ -606,10 +620,13 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { } fieldsExist := map[string]struct{}{} + fieldOptions := map[string]index.FieldIndexingOptions{} for _, segment := range segments { fields := segment.Fields() for fieldi, field := range fields { fieldsExist[field] = struct{}{} + // record field options + fieldOptions[field] = segment.fieldsOptions[field] if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { fieldsSame = false } @@ -627,7 +644,7 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { sort.Strings(rv[1:]) // leave _id as first - return fieldsSame, rv + return fieldsSame, rv, fieldOptions } // Combine updateFieldInfo from all segments diff --git a/new.go b/new.go index c99b933d..ce80132c 100644 --- a/new.go +++ b/new.go @@ -102,6 +102,9 @@ type interim struct { // name -> field id + 1 FieldsMap map[string]uint16 + // FieldsOptions holds the indexing options for each field + FieldsOptions map[string]index.FieldIndexingOptions + // FieldsInv is the inverse of FieldsMap // field id -> name FieldsInv []string @@ -127,6 +130,9 @@ func (s *interim) reset() (err error) { for k := range s.FieldsMap { delete(s.FieldsMap, k) } + for k := range s.FieldsOptions { + delete(s.FieldsOptions, k) + } s.FieldsInv = s.FieldsInv[:0] s.metaBuf.Reset() s.tmp0 = s.tmp0[:0] @@ -173,15 +179,23 @@ func (s *interim) convert() (uint64, uint64, error) { if s.FieldsMap == nil { s.FieldsMap = map[string]uint16{} } + if s.FieldsOptions == nil { + s.FieldsOptions = map[string]index.FieldIndexingOptions{} + } s.getOrDefineField("_id") // _id field is fieldID 0 + var fName string for _, result := range s.results { result.VisitComposite(func(field index.CompositeField) { - s.getOrDefineField(field.Name()) + fName = field.Name() + s.getOrDefineField(fName) + s.FieldsOptions[fName] = field.Options() }) result.VisitFields(func(field index.Field) { + fName = field.Name() s.getOrDefineField(field.Name()) + s.FieldsOptions[fName] = field.Options() }) } @@ -192,10 +206,11 @@ func (s *interim) convert() (uint64, uint64, error) { } args := map[string]interface{}{ - "results": s.results, - "chunkMode": s.chunkMode, - "fieldsMap": s.FieldsMap, - "fieldsInv": s.FieldsInv, + "results": s.results, + "chunkMode": s.chunkMode, + "fieldsMap": s.FieldsMap, + "fieldsInv": s.FieldsInv, + "fieldsOptions": s.FieldsOptions, } if s.opaque == nil { s.opaque = map[int]resetable{} @@ -236,7 +251,7 @@ func (s *interim) convert() (uint64, uint64, error) { // we can persist a new fields section here // this new fields section will point to the various indexes available - sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque) + sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.FieldsOptions, s.w, s.opaque) if err != nil { return 0, 0, err } diff --git a/segment.go b/segment.go index 38e3b3a1..e2a6c698 100644 --- a/segment.go +++ b/segment.go @@ -54,6 +54,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { rv := &Segment{ SegmentBase: SegmentBase{ fieldsMap: make(map[string]uint16), + fieldsOptions: make(map[string]index.FieldIndexingOptions), invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), @@ -96,9 +97,10 @@ type SegmentBase struct { mem []byte memCRC uint32 chunkMode uint32 - fieldsMap map[string]uint16 // fieldName -> fieldID+1 - fieldsInv []string // fieldID -> fieldName - fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address + fieldsMap map[string]uint16 // fieldName -> fieldID+1 + fieldsOptions map[string]index.FieldIndexingOptions // fieldName -> fieldOptions + fieldsInv []string // fieldID -> fieldName + fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address numDocs uint64 storedIndexOffset uint64 sectionsIndexOffset uint64 @@ -128,6 +130,11 @@ func (sb *SegmentBase) updateSize() { sizeInBytes += (len(k) + SizeOfString) + SizeOfUint16 } + // fieldsOptions + for k := range sb.fieldsOptions { + sizeInBytes += (len(k) + SizeOfString) + SizeOfUint64 + } + // fieldsInv for _, entry := range sb.fieldsInv { sizeInBytes += len(entry) + SizeOfString @@ -341,8 +348,13 @@ func (sb *SegmentBase) loadField(fieldID uint16, pos uint64, fieldName := string(sb.mem[pos : pos+fieldNameLen]) pos += fieldNameLen + // read field options + fieldOptions, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(sz) + sb.fieldsInv = append(sb.fieldsInv, fieldName) sb.fieldsMap[fieldName] = uint16(fieldID + 1) + sb.fieldsOptions[fieldName] = index.FieldIndexingOptions(fieldOptions) fieldNumSections, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) diff --git a/write.go b/write.go index 39382d05..a3c049ea 100644 --- a/write.go +++ b/write.go @@ -19,6 +19,7 @@ import ( "io" "github.com/RoaringBitmap/roaring/v2" + index "github.com/blevesearch/bleve_index_api" ) // writes out the length of the roaring bitmap in bytes as varint @@ -50,7 +51,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, return tw, nil } -func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { +func persistFieldsSection(fieldsInv []string, fieldsOptions map[string]index.FieldIndexingOptions, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { var rv uint64 fieldsOffsets := make([]uint64, 0, len(fieldsInv)) @@ -70,6 +71,13 @@ func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int return 0, err } + // write out the field options + fieldOpts := fieldsOptions[fieldName] + _, err = writeUvarints(w, uint64(fieldOpts)) + if err != nil { + return 0, err + } + // write out the number of field-specific indexes // FIXME hard-coding to 2, and not attempting to support sparseness well _, err = writeUvarints(w, uint64(len(segmentSections))) From afd61d6632e28151146836c728144eaf4fb9fd78 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 16:18:31 +0530 Subject: [PATCH 10/23] Refactor field name retrieval --- new.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/new.go b/new.go index ce80132c..f4515d0f 100644 --- a/new.go +++ b/new.go @@ -194,7 +194,7 @@ func (s *interim) convert() (uint64, uint64, error) { }) result.VisitFields(func(field index.Field) { fName = field.Name() - s.getOrDefineField(field.Name()) + s.getOrDefineField(fName) s.FieldsOptions[fName] = field.Options() }) } From 395dbed3c75329505a11250a4210377d63e9f819 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 19:44:39 +0530 Subject: [PATCH 11/23] Refactor field indexing logic to utilize fieldsOptions over updatedFields --- merge.go | 53 ++++++++++++++++++++++------------ section_faiss_vector_index.go | 20 +++++++------ section_inverted_text_index.go | 33 +++++++++++---------- 3 files changed, 62 insertions(+), 44 deletions(-) diff --git a/merge.go b/merge.go index 73210fed..fadee957 100644 --- a/merge.go +++ b/merge.go @@ -145,6 +145,10 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) fieldsOptions = filterFieldOptions(fieldsOptions, fieldsMap) + // fieldsSame cannot be true if fields were deleted + if len(updatedFields) > 0 { + fieldsSame = false + } numDocs = computeNewDocCount(segments, drops) @@ -161,13 +165,12 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, "fieldsSame": fieldsSame, "fieldsMap": fieldsMap, "numDocs": numDocs, - "updatedFields": updatedFields, "fieldsOptions": fieldsOptions, } if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields) + fieldsMap, fieldsInv, fieldsOptions, fieldsSame, numDocs, cr, closeCh) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -386,8 +389,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo type varintEncoder func(uint64) (int, error) func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, - w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) { + fieldsMap map[string]uint16, fieldsInv []string, + fieldsOptions map[string]index.FieldIndexingOptions, + fieldsSame bool, newSegDocCount uint64, + w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. var newDocNum uint64 @@ -427,7 +432,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer // cannot copy directly if fields might have been deleted - if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 { + if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) if err != nil { return 0, nil, err @@ -470,8 +475,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // no entry for field in fieldsMap return false } - // early exit if the stored portion of the field is deleted - if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store { + // early exit if the store is not wanted for this field + if !fieldsOptions[field].IsStored() { return true } vals[fieldID] = append(vals[fieldID], value) @@ -505,8 +510,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // now walk the non-"_id" fields in order for fieldID := 1; fieldID < len(fieldsInv); fieldID++ { - // early exit if the stored portion of the field is deleted - if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store { + // early exit if no stored values for this field + if len(vals[fieldID]) == 0 { continue } storedFieldValues := vals[fieldID] @@ -625,8 +630,21 @@ func mergeFields(segments []*SegmentBase) (bool, []string, map[string]index.Fiel fields := segment.Fields() for fieldi, field := range fields { fieldsExist[field] = struct{}{} - // record field options - fieldOptions[field] = segment.fieldsOptions[field] + + if prev, ok := fieldOptions[field]; ok { + // Merge options conservatively: once a field option is disabled (bit cleared) + // in any segment, it remains disabled. This ensures deterministic behavior + // when options can only transition from true -> false. + fieldOptions[field] = prev & segment.fieldsOptions[field] + // check if any bits were cleared + if fieldOptions[field] != prev { + // Some bits were cleared (option changed from true -> false) + fieldsSame = false + } + } else { + // first occurrence of the field + fieldOptions[field] = segment.fieldsOptions[field] + } if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { fieldsSame = false } @@ -656,18 +674,15 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn if fieldInfo == nil { fieldInfo = make(map[string]*index.UpdateFieldInfo) } + // if field not present, add it if _, ok := fieldInfo[field]; !ok { fieldInfo[field] = &index.UpdateFieldInfo{ - Deleted: info.Deleted, - Index: info.Index, - Store: info.Store, - DocValues: info.DocValues, + // mark whether field is deleted in any segment + Deleted: info.Deleted, } } else { - fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted - fieldInfo[field].Index = fieldInfo[field].Index || info.Index - fieldInfo[field].Store = fieldInfo[field].Store || info.Store - fieldInfo[field].DocValues = fieldInfo[field].Store || info.DocValues + // in case of conflict (mark field deleted if latest segment marks it deleted) + fieldInfo[field].Deleted = info.Deleted } } diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 7928a8a0..c53a3b7a 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -105,8 +105,8 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if _, ok := sb.fieldsMap[fieldName]; !ok { continue } - // early exit if index data is supposed to be deleted - if info, ok := vo.updatedFields[fieldName]; ok && info.Index { + // early exit if field is is not required to be indexed + if !vo.fieldsOptions[fieldName].IsIndexed() { continue } @@ -690,10 +690,9 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable) func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &vectorIndexOpaque{ - fieldAddrs: make(map[uint16]int), - vecIDMap: make(map[int64]*vecInfo), - vecFieldMap: make(map[uint16]*indexContent), - updatedFields: make(map[string]*index.UpdateFieldInfo), + fieldAddrs: make(map[uint16]int), + vecIDMap: make(map[int64]*vecInfo), + vecFieldMap: make(map[uint16]*indexContent), } for k, v := range args { rv.Set(k, v) @@ -732,7 +731,8 @@ type vectorIndexOpaque struct { // index to be build. vecFieldMap map[uint16]*indexContent - updatedFields map[string]*index.UpdateFieldInfo + // field indexing options + fieldsOptions map[string]index.FieldIndexingOptions tmp0 []byte } @@ -775,6 +775,8 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.tmp0 = v.tmp0[:0] v.updatedFields = nil + v.fieldsOptions = nil + atomic.StoreUint64(&v.bytesWritten, 0) return nil @@ -782,7 +784,7 @@ func (v *vectorIndexOpaque) Reset() (err error) { func (v *vectorIndexOpaque) Set(key string, val interface{}) { switch key { - case "updatedFields": - v.updatedFields = val.(map[string]*index.UpdateFieldInfo) + case "fieldsOptions": + v.fieldsOptions = val.(map[string]index.FieldIndexingOptions) } } diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 98316404..b6d6fd6c 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -80,9 +80,8 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI } func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool, - newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, - updatedFields map[string]*index.UpdateFieldInfo, w *CountHashWriter, + fieldsInv []string, fieldsMap map[string]uint16, fieldsOptions map[string]index.FieldIndexingOptions, + fieldsSame bool, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, w *CountHashWriter, closeCh chan struct{}) (map[int]int, uint64, error) { var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 @@ -126,8 +125,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - // early exit if index data is supposed to be deleted - if info, ok := updatedFields[fieldName]; ok && info.Index { + // early exit if the field's index option is false + if !fieldsOptions[fieldName].IsIndexed() { continue } @@ -249,8 +248,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. postItr = postings.iterator(true, true, true, postItr) - // can only safely copy data if no field data has been deleted - if fieldsSame && len(updatedFields) == 0 { + // can only safely copy data if all segments have same fields + if fieldsSame { // can optimize by copying freq/norm/loc bytes directly lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying( term, postItr, newDocNums[itrI], newRoaring, @@ -323,8 +322,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - // early exit if docvalues data is supposed to be deleted - if info, ok := updatedFields[fieldName]; ok && info.DocValues { + // early exit if docvalues are not wanted for this field + if !fieldsOptions[fieldName].IncludeDocValues() { continue } fieldIDPlus1 := uint16(segment.fieldsMap[fieldName]) @@ -407,7 +406,7 @@ func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*S w *CountHashWriter, closeCh chan struct{}) error { io := i.getInvertedIndexOpaque(opaque) fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv, - io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh) + io.FieldsMap, io.FieldsOptions, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh) if err != nil { return err } @@ -953,8 +952,7 @@ func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int { func (i *invertedTextIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &invertedIndexOpaque{ - fieldAddrs: map[int]int{}, - updatedFields: map[string]*index.UpdateFieldInfo{}, + fieldAddrs: map[int]int{}, } for k, v := range args { rv.Set(k, v) @@ -981,6 +979,10 @@ type invertedIndexOpaque struct { // field id -> name FieldsInv []string + // Field indexing options + // field name -> options + FieldsOptions map[string]index.FieldIndexingOptions + // Term dictionaries for each field // field id -> term -> postings list id + 1 Dicts []map[string]uint64 @@ -1023,8 +1025,6 @@ type invertedIndexOpaque struct { fieldAddrs map[int]int - updatedFields map[string]*index.UpdateFieldInfo - fieldsSame bool numDocs uint64 } @@ -1035,6 +1035,7 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.init = false io.chunkMode = 0 io.FieldsMap = nil + io.FieldsOptions = nil io.FieldsInv = nil for i := range io.Dicts { io.Dicts[i] = nil @@ -1093,11 +1094,11 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) { i.fieldsSame = val.(bool) case "fieldsMap": i.FieldsMap = val.(map[string]uint16) + case "fieldsOptions": + i.FieldsOptions = val.(map[string]index.FieldIndexingOptions) case "fieldsInv": i.FieldsInv = val.([]string) case "numDocs": i.numDocs = val.(uint64) - case "updatedFields": - i.updatedFields = val.(map[string]*index.UpdateFieldInfo) } } From f1a3e7763253b8bef9d439be976f76b941c09a4e Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 21:08:48 +0530 Subject: [PATCH 12/23] md changes --- zap.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/zap.md b/zap.md index 843a4ad0..aa1afda0 100644 --- a/zap.md +++ b/zap.md @@ -99,16 +99,17 @@ Sections Index is a set of NF uint64 addresses (0 through F# - 1) each of which + Sections Info + Sections Index + |============================================================================|=====================================| | | | - | +---------+---------+-----+---------+---------+~~~~~~~~+~~~~~~~~+--+...+-+ | +-------+--------+...+------+-----+ | - +----> S1 Addr | S1 Type | ... | Sn Addr | Sn Type | NS | Length | Name | | | 0 | 1 | | F#-1 | NF | | - | | +---------+---------+-----+---------+---------+~~~~~~~~+~~~~~~~~+--+...+-+ | +-------+----+---+...+------+-----+ | + | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+--------+...+------+-----+ | + +----> S1 Addr | S1 Type | ... | Sn Addr | Sn Type | NS | O | Name | Length | | | 0 | 1 | | F#-1 | NF | | + | | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+----+---+...+------+-----+ | | | | | | | +============================================================================+==============|======================+ | | - +----------------------------------------------------------------------------------------------+ + +----------------------------------------------------,------------------------------------------+ NF. Number of fields NS. Number of index sections + O. Field Indexing Options Sn. nth index section ## Inverted Text Index Section From b1a32aa7f1b1f5054f61bd354d8823349e32b093 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 01:58:40 +0530 Subject: [PATCH 13/23] pass test --- merge.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/merge.go b/merge.go index fadee957..287fe1ed 100644 --- a/merge.go +++ b/merge.go @@ -121,12 +121,33 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf return fieldsInv[:idx] } -// Remove field options for fields that have been completely deleted -func filterFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, - fieldsMap map[string]uint16) map[string]index.FieldIndexingOptions { - for field := range fieldOptions { - if _, ok := fieldsMap[field]; !ok { - delete(fieldOptions, field) +// Remove field options for fields using updateFieldInfo to override the +// options selected during mergeFields, if needed. This is mainly +// for the case where there is a field option update which has not been +// propogated yet, because a new segment has not been created yet. +func finalizeFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, + updatedFields map[string]*index.UpdateFieldInfo) map[string]index.FieldIndexingOptions { + for field, opts := range fieldOptions { + if info, ok := updatedFields[field]; ok { + // if field is deleted, remove its options + if info.Deleted { + delete(fieldOptions, field) + continue + } + // otherwise, update options based on info + if info.Index { + // ensure indexing is disabled + opts &^= index.IndexField + } + if info.Store { + // ensure storing is disabled + opts &^= index.StoreField + } + if info.DocValues { + // ensure doc values is disabled + opts &^= index.DocValues + } + fieldOptions[field] = opts } } return fieldOptions @@ -144,7 +165,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, updatedFields := mergeUpdatedFields(segments) fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) - fieldsOptions = filterFieldOptions(fieldsOptions, fieldsMap) + fieldsOptions = finalizeFieldOptions(fieldsOptions, updatedFields) // fieldsSame cannot be true if fields were deleted if len(updatedFields) > 0 { fieldsSame = false @@ -678,11 +699,16 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn if _, ok := fieldInfo[field]; !ok { fieldInfo[field] = &index.UpdateFieldInfo{ // mark whether field is deleted in any segment - Deleted: info.Deleted, + Deleted: info.Deleted, + Index: info.Index, + Store: info.Store, + DocValues: info.DocValues, } } else { - // in case of conflict (mark field deleted if latest segment marks it deleted) - fieldInfo[field].Deleted = info.Deleted + fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted + fieldInfo[field].Index = fieldInfo[field].Index || info.Index + fieldInfo[field].Store = fieldInfo[field].Store || info.Store + fieldInfo[field].DocValues = fieldInfo[field].DocValues || info.DocValues } } From 46d5600f73c14f4c2d81181556f23306b4b277ac Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 14:45:28 +0530 Subject: [PATCH 14/23] fix merge conflict --- section_faiss_vector_index.go | 1 - section_inverted_text_index.go | 1 - 2 files changed, 2 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index c53a3b7a..8610563b 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -773,7 +773,6 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.vecFieldMap = nil v.vecIDMap = nil v.tmp0 = v.tmp0[:0] - v.updatedFields = nil v.fieldsOptions = nil diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index b6d6fd6c..2d943f87 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -1080,7 +1080,6 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.numDocs = 0 clear(io.fieldAddrs) - clear(io.updatedFields) return err } From 0e2346510a6125f5fd2db359fdea9ee708bd4eab Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 6 Nov 2025 12:55:15 +0530 Subject: [PATCH 15/23] Support GPU-Accelerated Vector Search --- faiss_vector_gpu.go | 192 ++++++++++++++++++++++++++++++++++ faiss_vector_gpu_test.go | 39 +++++++ section_faiss_vector_index.go | 68 +++++++++--- 3 files changed, 284 insertions(+), 15 deletions(-) create mode 100644 faiss_vector_gpu.go create mode 100644 faiss_vector_gpu_test.go diff --git a/faiss_vector_gpu.go b/faiss_vector_gpu.go new file mode 100644 index 00000000..edee8f1b --- /dev/null +++ b/faiss_vector_gpu.go @@ -0,0 +1,192 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "math" + "math/rand/v2" + "sync" + + faiss "github.com/blevesearch/go-faiss" +) + +var ( + // NumGPUs is the number of available GPU devices + NumGPUs int + // GPULocks is a slice of mutexes for synchronizing access to GPU resources + // Primarily used for synchronizing calls to TransferToGpu and TransferFromGpu + GPULocks []*sync.Mutex +) + +func init() { + n, err := faiss.NumGPUs() + if err != nil { + NumGPUs = 0 + return + } + NumGPUs = n + GPULocks = make([]*sync.Mutex, NumGPUs) + for i := 0; i < NumGPUs; i++ { + GPULocks[i] = &sync.Mutex{} + } +} + +const ( + // GPUIndexMinVectorsForTransfer is the minimum number of vectors + // required to consider transferring an IVF index to GPU for training + // Smaller indexes may not benefit from GPU acceleration + // due to transfer overheads + GPUIndexMinVectorsForTransfer = 30000 + + // GPUTransferOverheadFactor accounts for additional + // memory overhead involved during GPU index transfer, + // such as data transfer costs and temporary allocations + GPUTransferOverheadFactor = 1.4 + + // SoftmaxTemperature controls the sharpness of the probability distribution + // when selecting a GPU based on available free memory. + // Lower values (<1) make the selection more deterministic (favoring the GPU + // with the most free memory), while higher values (>1) make it more random + SoftmaxTemperature = 0.5 +) + +type gpuInfo struct { + id int + freeMem float64 // in bytes +} + +// returns a device ID between 0 and NumGPUs-1 (inclusive) +// to be used for distributing work across multiple GPUs +// returns -1 if no GPUs are available or an error occurs +// The selection algorithm favors GPUs with more free memory, +// using a softmax-based probabilistic selection to help +// spread load across multiple GPUs when more than two GPUs are available +func GetDeviceID() int { + // simple cases + // no GPUs available, return -1 + if NumGPUs == 0 { + return -1 + } + + var gpus []*gpuInfo + for i := 0; i < NumGPUs; i++ { + freeMem, err := faiss.FreeMemory(i) + if err != nil { + continue + } + gpus = append(gpus, &gpuInfo{id: i, freeMem: float64(freeMem)}) + } + + if len(gpus) == 0 { + // fallback if no memory info available + // return -1 to indicate no GPUs available + // even though NumGPUs > 0 as we couldn't + // get memory info and thus cannot provide a + // reliable device selection + return -1 + } + + var maxGPU *gpuInfo + for _, g := range gpus { + if maxGPU == nil || g.freeMem > maxGPU.freeMem { + maxGPU = g + } + } + + // if only two or fewer GPUs, just pick the one with the most free memory + if len(gpus) <= 2 { + return maxGPU.id + } + + // more than two GPUs, do softmax weighting based on free memory + // to probabilistically pick a GPU, favoring those with more free memory + // this helps spread load more evenly across multiple GPUs, while still + // favoring those with more available resources, mainly to avoid + // always picking the same GPU when multiple GPUs have similar free memory + expVals := make([]float64, len(gpus)) + var sumExp float64 + for i, g := range gpus { + val := math.Exp((g.freeMem - maxGPU.freeMem) / (SoftmaxTemperature * maxGPU.freeMem)) + expVals[i] = val + sumExp += val + } + + // Compute cumulative distribution and sample. + r := rand.Float64() + cumProb := 0.0 + for i, g := range gpus { + cumProb += expVals[i] / sumExp + if r <= cumProb { + return g.id + } + } + + // Fallback to max GPU + return maxGPU.id +} + +func TrainIndex(index *faiss.IndexImpl, vecs []float32, dims int, useGPU bool) (*faiss.IndexImpl, error) { + // function to train index on CPU used as fallback on GPU failures + TrainCPU := func() (*faiss.IndexImpl, error) { + err := index.Train(vecs) + return index, err + } + // decide whether to use GPU for training + if !useGPU || NumGPUs == 0 || len(vecs)/dims < GPUIndexMinVectorsForTransfer { + // use CPU training + return TrainCPU() + } + // attempt GPU training + deviceID := GetDeviceID() + if deviceID == -1 { + // no GPUs available, fallback to CPU training + return TrainCPU() + } + // lock the selected GPU for the duration of the transfer and training + GPULocks[deviceID].Lock() + defer GPULocks[deviceID].Unlock() + // check if enough free memory is available + estimatedMemNeeded := uint64(float64(len(vecs)*SizeOfFloat32) * GPUTransferOverheadFactor) // input vectors + overhead + freeMem, err := faiss.FreeMemory(deviceID) + if err != nil || freeMem < estimatedMemNeeded { + // unable to get free memory info or not enough free memory, + // fallback to CPU training + return TrainCPU() + } + // transfer index to GPU + gpuIndex, err := faiss.TransferToGPU(index, deviceID) + if err != nil { + // transfer failed, fallback to CPU training + return TrainCPU() + } + // train on GPU + err = gpuIndex.Train(vecs) + if err != nil { + // training failed, fallback to CPU training + gpuIndex.Close() + return TrainCPU() + } + // transfer back to CPU + cpuIndex, err := faiss.TransferToCPU(gpuIndex) + gpuIndex.Close() + if err != nil { + // transfer back failed, fallback to CPU training + return TrainCPU() + } + // successful GPU training and transfer back to CPU + // now free the original index and return the new trained index + index.Close() + return cpuIndex, nil +} diff --git a/faiss_vector_gpu_test.go b/faiss_vector_gpu_test.go new file mode 100644 index 00000000..32d723c1 --- /dev/null +++ b/faiss_vector_gpu_test.go @@ -0,0 +1,39 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "testing" + + "github.com/blevesearch/go-faiss" +) + +func TestNumGPUs(t *testing.T) { + _, err := faiss.NumGPUs() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestGetDeviceID(t *testing.T) { + id := GetDeviceID() + if NumGPUs == 0 { + if id != -1 { + t.Fatalf("expected -1 device ID when no GPUs available, got: %d", id) + } + } else if id < 0 || id >= NumGPUs { + t.Fatalf("expected device ID between 0 and %d, got: %d", NumGPUs-1, id) + } +} diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 8610563b..b1fac198 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -75,6 +75,7 @@ type vecIndexInfo struct { indexSize uint64 vecIds []int64 indexOptimizedFor string + indexOptions index.VectorIndexOptions index *faiss.IndexImpl } @@ -130,6 +131,10 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se indexOptimizationTypeInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n + // the vector index options represented as a uint64 + vectorIndexOptionsInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + numVecs, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n @@ -137,6 +142,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se indexes = append(indexes, &vecIndexInfo{ vecIds: make([]int64, 0, numVecs), indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)], + indexOptions: index.VectorIndexOptions(vectorIndexOptionsInt), }) curIdx := len(indexes) - 1 @@ -279,6 +285,7 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // that they are extracted from the field mapping info. var dims, metric int var indexOptimizedFor string + var indexOptions index.VectorIndexOptions var validMerge bool var finalVecIDCap, indexDataCap, reconsCap int @@ -317,6 +324,7 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, dims = index.D() metric = int(index.MetricType()) indexOptimizedFor = vecIndexes[segI].indexOptimizedFor + indexOptions = vecIndexes[segI].indexOptions } // not a valid merge operation as there are no valid indexes to merge. @@ -383,14 +391,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, defer faissIndex.Close() if indexClass == IndexTypeIVF { - // the direct map maintained in the IVF index is essential for the - // reconstruction of vectors based on vector IDs in the future merges. - // the AddWithIDs API also needs a direct map to be set before using. - err = faissIndex.SetDirectMap(2) - if err != nil { - return err - } - nprobe := calculateNprobe(nlist, indexOptimizedFor) faissIndex.SetNProbe(nprobe) @@ -398,7 +398,19 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // the data space of indexData such that during the search time, we probe // only a subset of vectors -> non-exhaustive search. could be a time // consuming step when the indexData is large. - err = faissIndex.Train(indexData) + // Best effort attempt to use GPU for training if configured + // May fall back to CPU training if GPU training fails due + // to any reason such as insufficient memory, or unavailability + // of GPUs, etc. + faissIndex, err = TrainIndex(faissIndex, indexData, dims, indexOptions.UseGPU()) + if err != nil { + return err + } + + // the direct map maintained in the IVF index is essential for the + // reconstruction of vectors based on vector IDs in the future merges. + // the AddWithIDs API also needs a direct map to be set before using. + err = faissIndex.SetDirectMap(2) if err != nil { return err } @@ -488,7 +500,8 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint for fieldID, content := range vo.vecFieldMap { // calculate the capacity of the vecs and ids slices // to avoid multiple allocations. - vecs := make([]float32, 0, len(content.vecs)*int(content.dim)) + dims := int(content.dim) + vecs := make([]float32, 0, len(content.vecs)*dims) ids := make([]int64, 0, len(content.vecs)) for hash, vecInfo := range content.vecs { vecs = append(vecs, vecInfo.vec...) @@ -501,6 +514,10 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // use the same FAISS metric for inner product and cosine similarity metric = faiss.MetricInnerProduct } + // read vector index options: + // was this field configured + // to use GPU for indexing? + useGPU := content.options.UseGPU() nvecs := len(ids) nlist := determineCentroids(nvecs) @@ -514,15 +531,18 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint defer faissIndex.Close() if indexClass == IndexTypeIVF { - err = faissIndex.SetDirectMap(2) + nprobe := calculateNprobe(nlist, content.indexOptimizedFor) + faissIndex.SetNProbe(nprobe) + // Best effort attempt to use GPU for training if configured + // May fall back to CPU training if GPU training fails due + // to any reason such as insufficient memory, or unavailability + // of GPUs, etc. + faissIndex, err = TrainIndex(faissIndex, vecs, dims, useGPU) if err != nil { return 0, err } - nprobe := calculateNprobe(nlist, content.indexOptimizedFor) - faissIndex.SetNProbe(nprobe) - - err = faissIndex.Train(vecs) + err = faissIndex.SetDirectMap(2) if err != nil { return 0, err } @@ -553,6 +573,12 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint return 0, err } + n = binary.PutUvarint(tempBuf, uint64(content.options)) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + // write the number of unique vectors n = binary.PutUvarint(tempBuf, uint64(faissIndex.Ntotal())) _, err = w.Write(tempBuf[:n]) @@ -626,6 +652,16 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do metric := field.Similarity() indexOptimizedFor := field.IndexOptimizedFor() + // read any vector index options here + useGPU := field.GPU() + + // create vector index options to use + var options index.VectorIndexOptions + + if useGPU { + options |= index.FlagUseGPU + } + // caller is supposed to make sure len(vec) is a multiple of dim. // Not double checking it here to avoid the overhead. numSubVecs := len(vec) / dim @@ -652,6 +688,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do dim: uint16(dim), metric: metric, indexOptimizedFor: indexOptimizedFor, + options: options, } } else { vo.vecFieldMap[fieldID].vecs[subVecHash] = &vecInfo{ @@ -706,6 +743,7 @@ type indexContent struct { dim uint16 metric string indexOptimizedFor string + options index.VectorIndexOptions } type vecInfo struct { From 424119252da615c3d897f55118dfb5f06b3fd743 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 6 Nov 2025 21:05:32 +0530 Subject: [PATCH 16/23] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- faiss_vector_gpu.go | 15 ++++++++------- section_faiss_vector_index.go | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/faiss_vector_gpu.go b/faiss_vector_gpu.go index edee8f1b..79ebdb54 100644 --- a/faiss_vector_gpu.go +++ b/faiss_vector_gpu.go @@ -26,7 +26,7 @@ var ( // NumGPUs is the number of available GPU devices NumGPUs int // GPULocks is a slice of mutexes for synchronizing access to GPU resources - // Primarily used for synchronizing calls to TransferToGpu and TransferFromGpu + // Primarily used for synchronizing calls to TransferToGPU and TransferToCPU GPULocks []*sync.Mutex ) @@ -67,12 +67,9 @@ type gpuInfo struct { freeMem float64 // in bytes } -// returns a device ID between 0 and NumGPUs-1 (inclusive) -// to be used for distributing work across multiple GPUs -// returns -1 if no GPUs are available or an error occurs -// The selection algorithm favors GPUs with more free memory, -// using a softmax-based probabilistic selection to help -// spread load across multiple GPUs when more than two GPUs are available +// GetDeviceID returns a device ID between 0 and NumGPUs-1 (inclusive) to be used for distributing work across multiple GPUs. +// It returns -1 if no GPUs are available or an error occurs. The selection algorithm favors GPUs with more free memory, +// using a softmax-based probabilistic selection to help spread load across multiple GPUs when more than two GPUs are available. func GetDeviceID() int { // simple cases // no GPUs available, return -1 @@ -137,6 +134,10 @@ func GetDeviceID() int { return maxGPU.id } +// TrainIndex trains the given FAISS index using the provided vectors and dimensions. +// If useGPU is true and a suitable GPU is available, training is performed on the GPU; +// otherwise, training falls back to the CPU. The function returns the trained index, +// which may be a new instance if GPU training and transfer back to CPU succeed. func TrainIndex(index *faiss.IndexImpl, vecs []float32, dims int, useGPU bool) (*faiss.IndexImpl, error) { // function to train index on CPU used as fallback on GPU failures TrainCPU := func() (*faiss.IndexImpl, error) { diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index b1fac198..ee6e4951 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -516,7 +516,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint } // read vector index options: // was this field configured - // to use GPU for indexing? + // to use GPU for training? useGPU := content.options.UseGPU() nvecs := len(ids) From a6aca225905bf241ed7c065e2f1e9a963cd7be90 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 6 Nov 2025 21:13:27 +0530 Subject: [PATCH 17/23] fix divByZero --- faiss_vector_gpu.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/faiss_vector_gpu.go b/faiss_vector_gpu.go index 79ebdb54..3e994220 100644 --- a/faiss_vector_gpu.go +++ b/faiss_vector_gpu.go @@ -102,6 +102,11 @@ func GetDeviceID() int { } } + // if all the GPUs are full, return -1 + if maxGPU.freeMem == 0 { + return -1 + } + // if only two or fewer GPUs, just pick the one with the most free memory if len(gpus) <= 2 { return maxGPU.id From 857cf026e3fc6b80b6e62683a75c047b6d29acc3 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 6 Nov 2025 21:33:12 +0530 Subject: [PATCH 18/23] fix bug --- section_faiss_vector_index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index ee6e4951..6789448e 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -388,7 +388,7 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, if err != nil { return err } - defer faissIndex.Close() + defer func() { faissIndex.Close() }() if indexClass == IndexTypeIVF { nprobe := calculateNprobe(nlist, indexOptimizedFor) @@ -528,7 +528,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint return 0, err } - defer faissIndex.Close() + defer func() { faissIndex.Close() }() if indexClass == IndexTypeIVF { nprobe := calculateNprobe(nlist, content.indexOptimizedFor) From 21eafbe7cc8b765343ad7d9c7c53ec0b5ea55c12 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 6 Nov 2025 21:34:22 +0530 Subject: [PATCH 19/23] add a nil check --- section_faiss_vector_index.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 6789448e..640875cb 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -388,7 +388,11 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, if err != nil { return err } - defer func() { faissIndex.Close() }() + defer func() { + if faissIndex != nil { + faissIndex.Close() + } + }() if indexClass == IndexTypeIVF { nprobe := calculateNprobe(nlist, indexOptimizedFor) @@ -528,7 +532,11 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint return 0, err } - defer func() { faissIndex.Close() }() + defer func() { + if faissIndex != nil { + faissIndex.Close() + } + }() if indexClass == IndexTypeIVF { nprobe := calculateNprobe(nlist, content.indexOptimizedFor) From 83276a07ffdbc244518933566da09db65116d969 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Fri, 7 Nov 2025 14:56:03 +0530 Subject: [PATCH 20/23] use new options --- section_faiss_vector_index.go | 39 ++++++++++++++++------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 640875cb..d07fe21a 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -75,7 +75,7 @@ type vecIndexInfo struct { indexSize uint64 vecIds []int64 indexOptimizedFor string - indexOptions index.VectorIndexOptions + indexOptions index.FieldIndexingOptions index *faiss.IndexImpl } @@ -127,12 +127,12 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se _, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n - // the index optimization type represented as an int - indexOptimizationTypeInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + // the field indexing options represented as a uint64 + fieldIndexingOptionsInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n - // the vector index options represented as a uint64 - vectorIndexOptionsInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + // the index optimization type represented as an int + indexOptimizationTypeInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n numVecs, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) @@ -142,7 +142,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se indexes = append(indexes, &vecIndexInfo{ vecIds: make([]int64, 0, numVecs), indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)], - indexOptions: index.VectorIndexOptions(vectorIndexOptionsInt), + indexOptions: index.FieldIndexingOptions(fieldIndexingOptionsInt), }) curIdx := len(indexes) - 1 @@ -211,6 +211,12 @@ func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter if err != nil { return err } + // write out the field indexing options + n = binary.PutUvarint(tempBuf, uint64(indexes[0].indexOptions)) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return err + } n = binary.PutUvarint(tempBuf, uint64(index.SupportedVectorIndexOptimizations[indexes[0].indexOptimizedFor])) _, err = w.Write(tempBuf[:n]) @@ -285,7 +291,7 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // that they are extracted from the field mapping info. var dims, metric int var indexOptimizedFor string - var indexOptions index.VectorIndexOptions + var indexOptions index.FieldIndexingOptions var validMerge bool var finalVecIDCap, indexDataCap, reconsCap int @@ -574,14 +580,14 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint if err != nil { return 0, err } - - n = binary.PutUvarint(tempBuf, uint64(index.SupportedVectorIndexOptimizations[content.indexOptimizedFor])) + // write out the field indexing options + n = binary.PutUvarint(tempBuf, uint64(content.options)) _, err = w.Write(tempBuf[:n]) if err != nil { return 0, err } - n = binary.PutUvarint(tempBuf, uint64(content.options)) + n = binary.PutUvarint(tempBuf, uint64(index.SupportedVectorIndexOptimizations[content.indexOptimizedFor])) _, err = w.Write(tempBuf[:n]) if err != nil { return 0, err @@ -659,16 +665,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do dim := field.Dims() metric := field.Similarity() indexOptimizedFor := field.IndexOptimizedFor() - - // read any vector index options here - useGPU := field.GPU() - - // create vector index options to use - var options index.VectorIndexOptions - - if useGPU { - options |= index.FlagUseGPU - } + options := field.Options() // caller is supposed to make sure len(vec) is a multiple of dim. // Not double checking it here to avoid the overhead. @@ -751,7 +748,7 @@ type indexContent struct { dim uint16 metric string indexOptimizedFor string - options index.VectorIndexOptions + options index.FieldIndexingOptions } type vecInfo struct { From 074e13f2c4336d048771d9cedef50cbc99d1128d Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 18:38:42 +0530 Subject: [PATCH 21/23] do not write options --- section_faiss_vector_index.go | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index d07fe21a..d6664d2d 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -75,7 +75,6 @@ type vecIndexInfo struct { indexSize uint64 vecIds []int64 indexOptimizedFor string - indexOptions index.FieldIndexingOptions index *faiss.IndexImpl } @@ -127,10 +126,6 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se _, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n - // the field indexing options represented as a uint64 - fieldIndexingOptionsInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) - pos += n - // the index optimization type represented as an int indexOptimizationTypeInt, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n @@ -142,7 +137,6 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se indexes = append(indexes, &vecIndexInfo{ vecIds: make([]int64, 0, numVecs), indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)], - indexOptions: index.FieldIndexingOptions(fieldIndexingOptionsInt), }) curIdx := len(indexes) - 1 @@ -211,12 +205,6 @@ func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter if err != nil { return err } - // write out the field indexing options - n = binary.PutUvarint(tempBuf, uint64(indexes[0].indexOptions)) - _, err = w.Write(tempBuf[:n]) - if err != nil { - return err - } n = binary.PutUvarint(tempBuf, uint64(index.SupportedVectorIndexOptimizations[indexes[0].indexOptimizedFor])) _, err = w.Write(tempBuf[:n]) @@ -330,7 +318,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, dims = index.D() metric = int(index.MetricType()) indexOptimizedFor = vecIndexes[segI].indexOptimizedFor - indexOptions = vecIndexes[segI].indexOptions } // not a valid merge operation as there are no valid indexes to merge. @@ -527,7 +514,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // read vector index options: // was this field configured // to use GPU for training? - useGPU := content.options.UseGPU() + useGPU := vo.fieldsOptions nvecs := len(ids) nlist := determineCentroids(nvecs) @@ -551,7 +538,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // May fall back to CPU training if GPU training fails due // to any reason such as insufficient memory, or unavailability // of GPUs, etc. - faissIndex, err = TrainIndex(faissIndex, vecs, dims, useGPU) + faissIndex, err = TrainIndex(faissIndex, vecs, dims) if err != nil { return 0, err } @@ -580,12 +567,6 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint if err != nil { return 0, err } - // write out the field indexing options - n = binary.PutUvarint(tempBuf, uint64(content.options)) - _, err = w.Write(tempBuf[:n]) - if err != nil { - return 0, err - } n = binary.PutUvarint(tempBuf, uint64(index.SupportedVectorIndexOptimizations[content.indexOptimizedFor])) _, err = w.Write(tempBuf[:n]) @@ -665,7 +646,6 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do dim := field.Dims() metric := field.Similarity() indexOptimizedFor := field.IndexOptimizedFor() - options := field.Options() // caller is supposed to make sure len(vec) is a multiple of dim. // Not double checking it here to avoid the overhead. @@ -693,7 +673,6 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do dim: uint16(dim), metric: metric, indexOptimizedFor: indexOptimizedFor, - options: options, } } else { vo.vecFieldMap[fieldID].vecs[subVecHash] = &vecInfo{ @@ -748,7 +727,7 @@ type indexContent struct { dim uint16 metric string indexOptimizedFor string - options index.FieldIndexingOptions + gpu bool } type vecInfo struct { From 00eeefc6bfef272a0c15ebe1c0254a0c0cf30fd8 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 19:08:14 +0530 Subject: [PATCH 22/23] fix --- section_faiss_vector_index.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index d6664d2d..194a902e 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -174,7 +174,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if err != nil { return err } - err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, w, closeCh) + err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, fieldName, w, closeCh) if err != nil { return err } @@ -273,13 +273,12 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 { // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, - vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error { + vecIndexes []*vecIndexInfo, fieldName string, w *CountHashWriter, closeCh chan struct{}) error { // safe to assume that all the indexes are of the same config values, given // that they are extracted from the field mapping info. var dims, metric int var indexOptimizedFor string - var indexOptions index.FieldIndexingOptions var validMerge bool var finalVecIDCap, indexDataCap, reconsCap int @@ -325,6 +324,9 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, return nil } + // assign gpu value from the field options + gpu := v.fieldsOptions[fieldName].UseGPU() + finalVecIDs := make([]int64, 0, finalVecIDCap) // merging of indexes with reconstruction method. // the indexes[i].vecIds has only the valid vecs of this vector @@ -399,7 +401,7 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // May fall back to CPU training if GPU training fails due // to any reason such as insufficient memory, or unavailability // of GPUs, etc. - faissIndex, err = TrainIndex(faissIndex, indexData, dims, indexOptions.UseGPU()) + faissIndex, err = TrainIndex(faissIndex, indexData, dims, gpu) if err != nil { return err } @@ -514,7 +516,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // read vector index options: // was this field configured // to use GPU for training? - useGPU := vo.fieldsOptions + gpu := vo.fieldsOptions[content.name].UseGPU() nvecs := len(ids) nlist := determineCentroids(nvecs) @@ -538,7 +540,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // May fall back to CPU training if GPU training fails due // to any reason such as insufficient memory, or unavailability // of GPUs, etc. - faissIndex, err = TrainIndex(faissIndex, vecs, dims) + faissIndex, err = TrainIndex(faissIndex, vecs, dims, gpu) if err != nil { return 0, err } @@ -642,6 +644,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do } //process field + name := field.Name() vec := field.Vector() dim := field.Dims() metric := field.Similarity() @@ -673,6 +676,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do dim: uint16(dim), metric: metric, indexOptimizedFor: indexOptimizedFor, + name: name, } } else { vo.vecFieldMap[fieldID].vecs[subVecHash] = &vecInfo{ @@ -723,11 +727,11 @@ func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) reseta } type indexContent struct { + name string vecs map[int64]*vecInfo dim uint16 metric string indexOptimizedFor string - gpu bool } type vecInfo struct { From 7ca0d80b89ce68de62ce5e02573340504138af2f Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 19:31:43 +0530 Subject: [PATCH 23/23] add go tag --- faiss_vector_gpu.go | 13 +++++++++++-- faiss_vector_gpu_test.go | 3 +++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/faiss_vector_gpu.go b/faiss_vector_gpu.go index 3e994220..b4317343 100644 --- a/faiss_vector_gpu.go +++ b/faiss_vector_gpu.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build vectors +// +build vectors + package zap import ( @@ -160,23 +163,25 @@ func TrainIndex(index *faiss.IndexImpl, vecs []float32, dims int, useGPU bool) ( // no GPUs available, fallback to CPU training return TrainCPU() } - // lock the selected GPU for the duration of the transfer and training + // lock the selected GPU for the duration of the transfer GPULocks[deviceID].Lock() - defer GPULocks[deviceID].Unlock() // check if enough free memory is available estimatedMemNeeded := uint64(float64(len(vecs)*SizeOfFloat32) * GPUTransferOverheadFactor) // input vectors + overhead freeMem, err := faiss.FreeMemory(deviceID) if err != nil || freeMem < estimatedMemNeeded { // unable to get free memory info or not enough free memory, // fallback to CPU training + GPULocks[deviceID].Unlock() return TrainCPU() } // transfer index to GPU gpuIndex, err := faiss.TransferToGPU(index, deviceID) if err != nil { // transfer failed, fallback to CPU training + GPULocks[deviceID].Unlock() return TrainCPU() } + GPULocks[deviceID].Unlock() // train on GPU err = gpuIndex.Train(vecs) if err != nil { @@ -184,13 +189,17 @@ func TrainIndex(index *faiss.IndexImpl, vecs []float32, dims int, useGPU bool) ( gpuIndex.Close() return TrainCPU() } + // acquire lock again for transfer back to CPU + GPULocks[deviceID].Lock() // transfer back to CPU cpuIndex, err := faiss.TransferToCPU(gpuIndex) gpuIndex.Close() if err != nil { + GPULocks[deviceID].Unlock() // transfer back failed, fallback to CPU training return TrainCPU() } + GPULocks[deviceID].Unlock() // successful GPU training and transfer back to CPU // now free the original index and return the new trained index index.Close() diff --git a/faiss_vector_gpu_test.go b/faiss_vector_gpu_test.go index 32d723c1..b2858404 100644 --- a/faiss_vector_gpu_test.go +++ b/faiss_vector_gpu_test.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build vectors +// +build vectors + package zap import (