From b28b519d225252b947d6bbb13689a1fc785e0ed1 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Tue, 11 Nov 2025 23:50:12 +0530 Subject: [PATCH 01/18] Introducing Zapx V17 --- build.go | 12 +-- inverted_text_cache.go | 105 +++++++++++++++++++++ segment.go | 208 ++++++++++------------------------------- synonym_cache.go | 18 ++-- thesaurus.go | 2 + 5 files changed, 171 insertions(+), 174 deletions(-) create mode 100644 inverted_text_cache.go diff --git a/build.go b/build.go index 7843653a..19476e9b 100644 --- a/build.go +++ b/build.go @@ -22,11 +22,10 @@ import ( "os" index "github.com/blevesearch/bleve_index_api" - "github.com/blevesearch/vellum" ) -const Version uint32 = 16 -const IndexSectionsVersion uint32 = 16 +const Version uint32 = 17 + const Type string = "zap" const fieldNotUninverted = math.MaxUint64 @@ -171,19 +170,18 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), docValueOffset: 0, // docValueOffsets identified automatically by the section updatedFields: make(map[string]*index.UpdateFieldInfo), - fieldFSTs: make(map[uint16]*vellum.FST), + invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), - // following fields gets populated by loadFieldsNew + // following fields gets populated by loadFields fieldsMap: make(map[string]uint16), - dictLocs: make([]uint64, 0), fieldsInv: make([]string, 0), } sb.updateSize() // load the data/section starting offsets for each field // by via the sectionsIndexOffset as starting point. - err := sb.loadFieldsNew() + err := sb.loadFields() if err != nil { return nil, err } diff --git a/inverted_text_cache.go b/inverted_text_cache.go new file mode 100644 index 00000000..c77002ef --- /dev/null +++ b/inverted_text_cache.go @@ -0,0 +1,105 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "encoding/binary" + "fmt" + "sync" + + "github.com/blevesearch/vellum" +) + +func newInvertedIndexCache() *invertedIndexCache { + return &invertedIndexCache{ + cache: make(map[uint16]*invertedCacheEntry), + } +} + +type invertedIndexCache struct { + m sync.RWMutex + + cache map[uint16]*invertedCacheEntry +} + +// Clear clears the synonym cache which would mean tha the termID to term map would no longer be available. +func (sc *invertedIndexCache) Clear() { + sc.m.Lock() + sc.cache = nil + sc.m.Unlock() +} + +// loadOrCreate loads the inverted index cache for the specified fieldID if it is already present, +// or creates it if not. The inverted index cache for a fieldID consists of an FST (Finite State Transducer): +// - A Vellum FST (Finite State Transducer) representing the TermDictionary. +// This function returns the loaded or newly created FST, and the number of bytes read from the provided memory slice, +// if the cache was created. +func (sc *invertedIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, uint64, error) { + sc.m.RLock() + entry, ok := sc.cache[fieldID] + if ok { + sc.m.RUnlock() + return entry.load() + } + + sc.m.RUnlock() + + sc.m.Lock() + defer sc.m.Unlock() + + entry, ok = sc.cache[fieldID] + if ok { + return entry.load() + } + + return sc.createAndCacheLOCKED(fieldID, mem) +} + +// createAndCacheLOCKED creates the inverted index cache for the specified fieldID and caches it. +func (sc *invertedIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, uint64, error) { + var pos uint64 + vellumLen, read := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + if vellumLen == 0 || read <= 0 { + return nil, 0, fmt.Errorf("vellum length is 0") + } + pos += uint64(read) + fstBytes := mem[pos : pos+vellumLen] + fst, err := vellum.Load(fstBytes) + if err != nil { + return nil, 0, fmt.Errorf("vellum err: %v", err) + } + pos += vellumLen + sc.insertLOCKED(fieldID, fst) + return fst, pos, nil +} + +// insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. +func (sc *invertedIndexCache) insertLOCKED(fieldID uint16, fst *vellum.FST) { + _, ok := sc.cache[fieldID] + if !ok { + sc.cache[fieldID] = &invertedCacheEntry{ + fst: fst, + } + } +} + +// invertedCacheEntry is the vellum FST and is the value stored in the invertedIndexCache cache, for a given fieldID. +type invertedCacheEntry struct { + fst *vellum.FST +} + +func (ce *invertedCacheEntry) load() (*vellum.FST, uint64, error) { + return ce.fst, 0, nil +} diff --git a/segment.go b/segment.go index 461fdf5a..1459fb95 100644 --- a/segment.go +++ b/segment.go @@ -28,7 +28,6 @@ import ( index "github.com/blevesearch/bleve_index_api" mmap "github.com/blevesearch/mmap-go" segment "github.com/blevesearch/scorch_segment_api/v2" - "github.com/blevesearch/vellum" "github.com/golang/snappy" ) @@ -55,7 +54,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { rv := &Segment{ SegmentBase: SegmentBase{ fieldsMap: make(map[string]uint16), - fieldFSTs: make(map[uint16]*vellum.FST), + invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), @@ -73,7 +72,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { return nil, err } - err = rv.loadFieldsNew() + err = rv.loadFields() if err != nil { _ = rv.Close() return nil, err @@ -105,17 +104,14 @@ type SegmentBase struct { fieldsIndexOffset uint64 sectionsIndexOffset uint64 docValueOffset uint64 - dictLocs []uint64 fieldDvReaders []map[uint16]*docValueReader // naive chunk cache per field; section->field->reader fieldDvNames []string // field names cached in fieldDvReaders size uint64 updatedFields map[string]*index.UpdateFieldInfo - m sync.Mutex - fieldFSTs map[uint16]*vellum.FST - - // this cache comes into play when vectors are supported in builds. + // section-specific cache + invIndexCache *invertedIndexCache vecIndexCache *vectorIndexCache synIndexCache *synonymIndexCache } @@ -133,11 +129,10 @@ func (sb *SegmentBase) updateSize() { sizeInBytes += (len(k) + SizeOfString) + SizeOfUint16 } - // fieldsInv, dictLocs + // fieldsInv for _, entry := range sb.fieldsInv { sizeInBytes += len(entry) + SizeOfString } - sizeInBytes += len(sb.dictLocs) * SizeOfUint64 // fieldDvReaders for _, secDvReaders := range sb.fieldDvReaders { @@ -155,6 +150,7 @@ func (sb *SegmentBase) updateSize() { func (sb *SegmentBase) AddRef() {} func (sb *SegmentBase) DecRef() (err error) { return nil } func (sb *SegmentBase) Close() (err error) { + sb.invIndexCache.Clear() sb.vecIndexCache.Clear() sb.synIndexCache.Clear() return nil @@ -212,7 +208,7 @@ func (s *Segment) loadConfig() error { verOffset := crcOffset - 4 s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) - if Version < IndexSectionsVersion && s.version != Version { + if s.version != Version { return fmt.Errorf("unsupported version %d != %d", s.version, Version) } @@ -226,15 +222,9 @@ func (s *Segment) loadConfig() error { // determining the right footer size based on version, this becomes important // while loading the fields portion or the sections portion of the index file. - var footerSize int - if s.version >= IndexSectionsVersion { - // for version 16 and above, parse the sectionsIndexOffset - s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) - fieldsIndexOffset = fieldsIndexOffset - 8 - footerSize = FooterSize - } else { - footerSize = FooterSize - 8 - } + s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) + fieldsIndexOffset = fieldsIndexOffset - 8 + footerSize := FooterSize s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) @@ -287,48 +277,16 @@ func (sb *SegmentBase) BytesRead() uint64 { func (sb *SegmentBase) ResetBytesRead(val uint64) {} func (sb *SegmentBase) incrementBytesRead(val uint64) { + fmt.Println("oldValue - bytesRead - ", sb.bytesRead) atomic.AddUint64(&sb.bytesRead, val) + fmt.Println("newValue - bytesRead - ", sb.bytesRead) } func (sb *SegmentBase) loadFields() error { - // NOTE for now we assume the fields index immediately precedes - // the footer, and if this changes, need to adjust accordingly (or - // store explicit length), where s.mem was sliced from s.mm in Open(). - fieldsIndexEnd := uint64(len(sb.mem)) - - // iterate through fields index - var fieldID uint64 - for sb.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd { - addr := binary.BigEndian.Uint64(sb.mem[sb.fieldsIndexOffset+(8*fieldID) : sb.fieldsIndexOffset+(8*fieldID)+8]) - - // accounting the address of the dictLoc being read from file - sb.incrementBytesRead(8) - - dictLoc, read := binary.Uvarint(sb.mem[addr:fieldsIndexEnd]) - n := uint64(read) - sb.dictLocs = append(sb.dictLocs, dictLoc) - - var nameLen uint64 - nameLen, read = binary.Uvarint(sb.mem[addr+n : fieldsIndexEnd]) - n += uint64(read) - - name := string(sb.mem[addr+n : addr+n+nameLen]) - - sb.incrementBytesRead(n + nameLen) - sb.fieldsInv = append(sb.fieldsInv, name) - sb.fieldsMap[name] = uint16(fieldID + 1) - - fieldID++ - } - return nil -} - -func (sb *SegmentBase) loadFieldsNew() error { pos := sb.sectionsIndexOffset if pos == 0 { - // this is the case only for older file formats - return sb.loadFields() + return fmt.Errorf("no sections index present") } seek := pos + binary.MaxVarintLen64 @@ -360,7 +318,7 @@ func (sb *SegmentBase) loadFieldsNew() error { fieldSectionMap := make(map[uint16]uint64) - err := sb.loadFieldNew(uint16(fieldID), addr, fieldSectionMap) + err := sb.loadField(uint16(fieldID), addr, fieldSectionMap) if err != nil { return err } @@ -374,7 +332,7 @@ func (sb *SegmentBase) loadFieldsNew() error { return nil } -func (sb *SegmentBase) loadFieldNew(fieldID uint16, pos uint64, +func (sb *SegmentBase) loadField(fieldID uint16, pos uint64, fieldSectionMap map[uint16]uint64) error { if pos == 0 { // there is no indexing structure present for this field/section @@ -401,28 +359,6 @@ func (sb *SegmentBase) loadFieldNew(fieldID uint16, pos uint64, fieldSectionAddr := binary.BigEndian.Uint64(sb.mem[pos : pos+8]) pos += 8 fieldSectionMap[fieldSectionType] = fieldSectionAddr - if fieldSectionType == SectionInvertedTextIndex { - // for the fields which don't have the inverted index, the offset is - // 0 and during query time, because there is no valid dictionary we - // will just have follow a no-op path. - if fieldSectionAddr == 0 { - sb.dictLocs = append(sb.dictLocs, 0) - continue - } - - read := 0 - // skip the doc values - _, n := binary.Uvarint(sb.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - fieldSectionAddr += uint64(n) - read += n - _, n = binary.Uvarint(sb.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - fieldSectionAddr += uint64(n) - read += n - dictLoc, n := binary.Uvarint(sb.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - // account the bytes read while parsing the field's inverted index section - sb.incrementBytesRead(uint64(read + n)) - sb.dictLocs = append(sb.dictLocs, dictLoc) - } } // account the bytes read while parsing the sections field index. @@ -441,41 +377,33 @@ func (sb *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { fieldIDPlus1 := sb.fieldsMap[field] - if fieldIDPlus1 > 0 { + if fieldIDPlus1 == 0 { + return nil, nil + } + pos := sb.fieldsSectionsMap[fieldIDPlus1-1][SectionInvertedTextIndex] + if pos > 0 { rv = &Dictionary{ sb: sb, field: field, fieldID: fieldIDPlus1 - 1, } - - dictStart := sb.dictLocs[rv.fieldID] - if dictStart > 0 { - var ok bool - sb.m.Lock() - if rv.fst, ok = sb.fieldFSTs[rv.fieldID]; !ok { - // read the length of the vellum data - vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64]) - if vellumLen == 0 { - sb.m.Unlock() - return nil, fmt.Errorf("empty dictionary for field: %v", field) - } - fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] - rv.incrementBytesRead(uint64(read) + vellumLen) - rv.fst, err = vellum.Load(fstBytes) - if err != nil { - sb.m.Unlock() - return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err) - } - - sb.fieldFSTs[rv.fieldID] = rv.fst - } - - sb.m.Unlock() - rv.fstReader, err = rv.fst.Reader() - if err != nil { - return nil, fmt.Errorf("dictionary field %s vellum reader err: %v", field, err) - } + // skip the doc value offsets to get to the dictionary portion + for i := 0; i < 2; i++ { + _, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + } + dictLoc, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + fst, bytesRead, err := sb.invIndexCache.loadOrCreate(rv.fieldID, sb.mem[dictLoc:]) + if err != nil { + return nil, fmt.Errorf("dictionary for field %s err: %v", field, err) } + rv.fst = fst + rv.fstReader, err = rv.fst.Reader() + if err != nil { + return nil, fmt.Errorf("dictionary nafor field %s, vellum reader err: %v", field, err) + } + rv.bytesRead += bytesRead } return rv, nil @@ -509,7 +437,7 @@ func (sb *SegmentBase) thesaurus(name string) (rv *Thesaurus, err error) { } thesLoc, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) - fst, synTermMap, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:]) + fst, synTermMap, bytesRead, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:]) if err != nil { return nil, fmt.Errorf("thesaurus name %s err: %v", name, err) } @@ -519,6 +447,7 @@ func (sb *SegmentBase) thesaurus(name string) (rv *Thesaurus, err error) { if err != nil { return nil, fmt.Errorf("thesaurus name %s vellum reader err: %v", name, err) } + rv.bytesRead += bytesRead } return rv, nil } @@ -700,6 +629,7 @@ func (s *Segment) Close() (err error) { func (s *Segment) closeActual() (err error) { // clear contents from the vector and synonym index cache before un-mmapping + s.invIndexCache.Clear() s.vecIndexCache.Clear() s.synIndexCache.Clear() @@ -765,10 +695,18 @@ func (s *Segment) NumDocs() uint64 { func (s *Segment) DictAddr(field string) (uint64, error) { fieldIDPlus1, ok := s.fieldsMap[field] if !ok { - return 0, fmt.Errorf("no such field '%s'", field) + return 0, fmt.Errorf("no dictionary for field '%s'", field) } - - return s.dictLocs[fieldIDPlus1-1], nil + dictStart := s.fieldsSectionsMap[fieldIDPlus1-1][SectionInvertedTextIndex] + if dictStart == 0 { + return 0, fmt.Errorf("no dictionary for field '%s'", field) + } + for i := 0; i < 2; i++ { + _, n := binary.Uvarint(s.mem[dictStart : dictStart+binary.MaxVarintLen64]) + dictStart += uint64(n) + } + dictLoc, _ := binary.Uvarint(s.mem[dictStart : dictStart+binary.MaxVarintLen64]) + return dictLoc, nil } // ThesaurusAddr is a helper function to compute the file offset where the @@ -842,48 +780,6 @@ func (s *Segment) loadDvReader(fieldID int, secID uint16) error { return nil } -func (s *Segment) loadDvReadersLegacy() error { - // older file formats to parse the docValueIndex and if that says doc values - // aren't there in this segment file, just return nil - if s.docValueOffset == fieldNotUninverted { - return nil - } - - for fieldID := range s.fieldsInv { - var read uint64 - start, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) - if n <= 0 { - return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %d", fieldID) - } - read += uint64(n) - end, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) - if n <= 0 { - return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID) - } - read += uint64(n) - s.incrementBytesRead(read) - - fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], start, end) - if err != nil { - return err - } - - if fieldDvReader != nil { - // older file formats have docValues corresponding only to inverted index - // ignore the rest. - if s.fieldDvReaders[SectionInvertedTextIndex] == nil { - s.fieldDvReaders[SectionInvertedTextIndex] = make(map[uint16]*docValueReader) - } - // fix the structure of fieldDvReaders - // currently it populates the inverted index doc values - s.fieldDvReaders[SectionInvertedTextIndex][uint16(fieldID)] = fieldDvReader - s.fieldDvNames = append(s.fieldDvNames, s.fieldsInv[fieldID]) - } - } - - return nil -} - // Segment is a file segment, and loading the dv readers from that segment // must account for the version while loading since the formats are different // in the older and the Version version. @@ -892,10 +788,6 @@ func (s *Segment) loadDvReaders() error { return nil } - if s.version < IndexSectionsVersion { - return s.loadDvReadersLegacy() - } - // for every section of every field, load the doc values and register // the readers. for fieldID := range s.fieldsInv { diff --git a/synonym_cache.go b/synonym_cache.go index 0b8d56c2..a4d24f92 100644 --- a/synonym_cache.go +++ b/synonym_cache.go @@ -46,7 +46,7 @@ func (sc *synonymIndexCache) Clear() { // - A Vellum FST (Finite State Transducer) representing the thesaurus. // - A map associating synonym IDs to their corresponding terms. // This function returns the loaded or newly created tuple (FST and map). -func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { +func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, uint64, error) { sc.m.RLock() entry, ok := sc.cache[fieldID] if ok { @@ -68,23 +68,23 @@ func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.F } // createAndCacheLOCKED creates the synonym index cache for the specified fieldID and caches it. -func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { +func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, uint64, error) { var pos uint64 vellumLen, read := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) if vellumLen == 0 || read <= 0 { - return nil, nil, fmt.Errorf("vellum length is 0") + return nil, nil, 0, fmt.Errorf("vellum length is 0") } pos += uint64(read) fstBytes := mem[pos : pos+vellumLen] fst, err := vellum.Load(fstBytes) if err != nil { - return nil, nil, fmt.Errorf("vellum err: %v", err) + return nil, nil, 0, fmt.Errorf("vellum err: %v", err) } pos += vellumLen numSyns, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) if numSyns == 0 { - return nil, nil, fmt.Errorf("no synonyms found") + return nil, nil, 0, fmt.Errorf("no synonyms found") } synTermMap := make(map[uint32][]byte, numSyns) for i := 0; i < int(numSyns); i++ { @@ -93,14 +93,14 @@ func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (* termLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) if termLen == 0 { - return nil, nil, fmt.Errorf("term length is 0") + return nil, nil, 0, fmt.Errorf("term length is 0") } term := mem[pos : pos+uint64(termLen)] pos += uint64(termLen) synTermMap[uint32(synID)] = term } sc.insertLOCKED(fieldID, fst, synTermMap) - return fst, synTermMap, nil + return fst, synTermMap, pos, nil } // insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. @@ -121,6 +121,6 @@ type synonymCacheEntry struct { synTermMap map[uint32][]byte } -func (ce *synonymCacheEntry) load() (*vellum.FST, map[uint32][]byte, error) { - return ce.fst, ce.synTermMap, nil +func (ce *synonymCacheEntry) load() (*vellum.FST, map[uint32][]byte, uint64, error) { + return ce.fst, ce.synTermMap, 0, nil } diff --git a/thesaurus.go b/thesaurus.go index 34a43629..f97aaf29 100644 --- a/thesaurus.go +++ b/thesaurus.go @@ -33,6 +33,8 @@ type Thesaurus struct { fst *vellum.FST fstReader *vellum.Reader + + bytesRead uint64 } // represents an immutable, empty Thesaurus From d974de5db64985f5a8bfc93b1985cd58c3758b13 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Tue, 11 Nov 2025 23:57:33 +0530 Subject: [PATCH 02/18] remove debug print statements --- segment.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/segment.go b/segment.go index 1459fb95..9e5fe146 100644 --- a/segment.go +++ b/segment.go @@ -277,9 +277,7 @@ func (sb *SegmentBase) BytesRead() uint64 { func (sb *SegmentBase) ResetBytesRead(val uint64) {} func (sb *SegmentBase) incrementBytesRead(val uint64) { - fmt.Println("oldValue - bytesRead - ", sb.bytesRead) atomic.AddUint64(&sb.bytesRead, val) - fmt.Println("newValue - bytesRead - ", sb.bytesRead) } func (sb *SegmentBase) loadFields() error { From 97898ccfc74f13595582409887c731960d330d4a Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 00:02:27 +0530 Subject: [PATCH 03/18] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- inverted_text_cache.go | 4 ++-- segment.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/inverted_text_cache.go b/inverted_text_cache.go index c77002ef..01cef2e1 100644 --- a/inverted_text_cache.go +++ b/inverted_text_cache.go @@ -34,7 +34,7 @@ type invertedIndexCache struct { cache map[uint16]*invertedCacheEntry } -// Clear clears the synonym cache which would mean tha the termID to term map would no longer be available. +// Clear clears the synonym cache which would mean that the termID to term map would no longer be available. func (sc *invertedIndexCache) Clear() { sc.m.Lock() sc.cache = nil @@ -85,7 +85,7 @@ func (sc *invertedIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) ( return fst, pos, nil } -// insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. +// insertLOCKED inserts the vellum FST into the cache for the specified fieldID. func (sc *invertedIndexCache) insertLOCKED(fieldID uint16, fst *vellum.FST) { _, ok := sc.cache[fieldID] if !ok { diff --git a/segment.go b/segment.go index 9e5fe146..9952ffcf 100644 --- a/segment.go +++ b/segment.go @@ -399,7 +399,7 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { rv.fst = fst rv.fstReader, err = rv.fst.Reader() if err != nil { - return nil, fmt.Errorf("dictionary nafor field %s, vellum reader err: %v", field, err) + return nil, fmt.Errorf("dictionary for field %s, vellum reader err: %v", field, err) } rv.bytesRead += bytesRead } From 583d6dcba82fceb45e08259ea7bb63888dbbbb70 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 00:07:46 +0530 Subject: [PATCH 04/18] small error msg fix --- segment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/segment.go b/segment.go index 9952ffcf..d962b7d0 100644 --- a/segment.go +++ b/segment.go @@ -693,7 +693,7 @@ func (s *Segment) NumDocs() uint64 { func (s *Segment) DictAddr(field string) (uint64, error) { fieldIDPlus1, ok := s.fieldsMap[field] if !ok { - return 0, fmt.Errorf("no dictionary for field '%s'", field) + return 0, fmt.Errorf("no such field '%s'", field) } dictStart := s.fieldsSectionsMap[fieldIDPlus1-1][SectionInvertedTextIndex] if dictStart == 0 { From 187f4e91e11b9081d06414a75dae549318d881f0 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 00:24:35 +0530 Subject: [PATCH 05/18] remove redundant docvalue reader code --- segment.go | 78 ------------------------------------------------------ 1 file changed, 78 deletions(-) diff --git a/segment.go b/segment.go index d962b7d0..5f3dcf99 100644 --- a/segment.go +++ b/segment.go @@ -726,91 +726,13 @@ func (s *Segment) ThesaurusAddr(name string) (uint64, error) { return thesLoc, nil } -func (s *Segment) getSectionDvOffsets(fieldID int, secID uint16) (uint64, uint64, uint64, error) { - // Version is gonna be 16 - var fieldLocStart uint64 = fieldNotUninverted - fieldLocEnd := fieldLocStart - sectionMap := s.fieldsSectionsMap[fieldID] - fieldAddrStart := sectionMap[secID] - n := 0 - - if fieldAddrStart > 0 { - // fixed encoding as of now, need to uvarint this - var read uint64 - fieldLocStart, n = binary.Uvarint(s.mem[fieldAddrStart+read : fieldAddrStart+read+binary.MaxVarintLen64]) - if n <= 0 { - return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %d", fieldID) - } - read += uint64(n) - - fieldLocEnd, n = binary.Uvarint(s.mem[fieldAddrStart+read : fieldAddrStart+read+binary.MaxVarintLen64]) - if n <= 0 { - return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID) - } - read += uint64(n) - - s.incrementBytesRead(read) - } - - return fieldLocStart, fieldLocEnd, 0, nil -} - -func (s *Segment) loadDvReader(fieldID int, secID uint16) error { - start, end, _, err := s.getSectionDvOffsets(fieldID, secID) - if err != nil { - return err - } - - fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], start, end) - if err != nil { - return err - } - - if fieldDvReader != nil { - if s.fieldDvReaders[secID] == nil { - s.fieldDvReaders[secID] = make(map[uint16]*docValueReader) - } - // fix the structure of fieldDvReaders - // currently it populates the inverted index doc values - s.fieldDvReaders[secID][uint16(fieldID)] = fieldDvReader - s.fieldDvNames = append(s.fieldDvNames, s.fieldsInv[fieldID]) - } - return nil -} - -// Segment is a file segment, and loading the dv readers from that segment -// must account for the version while loading since the formats are different -// in the older and the Version version. -func (s *Segment) loadDvReaders() error { - if s.numDocs == 0 { - return nil - } - - // for every section of every field, load the doc values and register - // the readers. - for fieldID := range s.fieldsInv { - for secID := range segmentSections { - s.loadDvReader(fieldID, secID) - } - } - - return nil -} - -// since segmentBase is an in-memory segment, it can be called only -// for v16 file formats as part of InitSegmentBase() while introducing -// a segment into the system. func (sb *SegmentBase) loadDvReaders() error { - - // evaluate -> s.docValueOffset == fieldNotUninverted if sb.numDocs == 0 { return nil } - for fieldID, sections := range sb.fieldsSectionsMap { for secID, secOffset := range sections { if secOffset > 0 { - // fixed encoding as of now, need to uvarint this pos := secOffset var read uint64 fieldLocStart, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) From 2a205bbc3853a1dddf6cbc45de646a43adc189d5 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 01:34:11 +0530 Subject: [PATCH 06/18] fix footer --- build.go | 5 +--- cmd/zap/cmd/docvalue.go | 2 +- cmd/zap/cmd/fields.go | 2 +- cmd/zap/cmd/footer.go | 3 +-- cmd/zap/cmd/synonym.go | 2 +- cmd/zap/cmd/vector.go | 2 +- merge.go | 4 +-- segment.go | 54 ++++++++++++++++++----------------------- write.go | 24 ++++++------------ zap.md | 34 ++++++++++++-------------- 10 files changed, 54 insertions(+), 78 deletions(-) diff --git a/build.go b/build.go index 19476e9b..ee3c3b09 100644 --- a/build.go +++ b/build.go @@ -98,8 +98,7 @@ func persistSegmentBaseToWriter(sb *SegmentBase, w io.Writer) (int, error) { return 0, err } - err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.sectionsIndexOffset, - sb.docValueOffset, sb.chunkMode, sb.memCRC, br) + err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.sectionsIndexOffset, sb.chunkMode, sb.memCRC, br) if err != nil { return 0, err } @@ -165,10 +164,8 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 chunkMode: chunkMode, numDocs: numDocs, storedIndexOffset: storedIndexOffset, - fieldsIndexOffset: sectionsIndexOffset, sectionsIndexOffset: sectionsIndexOffset, fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), - docValueOffset: 0, // docValueOffsets identified automatically by the section updatedFields: make(map[string]*index.UpdateFieldInfo), invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), diff --git a/cmd/zap/cmd/docvalue.go b/cmd/zap/cmd/docvalue.go index 86cf427d..d28e35dd 100644 --- a/cmd/zap/cmd/docvalue.go +++ b/cmd/zap/cmd/docvalue.go @@ -193,7 +193,7 @@ func docValueCmd(cmd *cobra.Command, args []string) error { data := segment.Data() // iterate through fields index - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats return fmt.Errorf("file format not supported") diff --git a/cmd/zap/cmd/fields.go b/cmd/zap/cmd/fields.go index 1b0bb8b9..1eec7882 100644 --- a/cmd/zap/cmd/fields.go +++ b/cmd/zap/cmd/fields.go @@ -32,7 +32,7 @@ var fieldsCmd = &cobra.Command{ } data := segment.Data() - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats return fmt.Errorf("file format not supported") diff --git a/cmd/zap/cmd/footer.go b/cmd/zap/cmd/footer.go index 1423a6b6..07d8bbb8 100644 --- a/cmd/zap/cmd/footer.go +++ b/cmd/zap/cmd/footer.go @@ -31,9 +31,8 @@ var footerCmd = &cobra.Command{ fmt.Printf("CRC: %#x\n", segment.CRC()) fmt.Printf("Version: %d\n", segment.Version()) fmt.Printf("Chunk Mode: %d\n", segment.ChunkMode()) - fmt.Printf("Fields Idx: %d (%#x)\n", segment.FieldsIndexOffset(), segment.FieldsIndexOffset()) + fmt.Printf("Sections Idx: %d (%#x)\n", segment.SectionsIndexOffset(), segment.SectionsIndexOffset()) fmt.Printf("Stored Idx: %d (%#x)\n", segment.StoredIndexOffset(), segment.StoredIndexOffset()) - fmt.Printf("DocValue Idx: %d (%#x)\n", segment.DocValueOffset(), segment.DocValueOffset()) fmt.Printf("Num Docs: %d\n", segment.NumDocs()) return nil }, diff --git a/cmd/zap/cmd/synonym.go b/cmd/zap/cmd/synonym.go index 1b3f7101..721c4451 100644 --- a/cmd/zap/cmd/synonym.go +++ b/cmd/zap/cmd/synonym.go @@ -29,7 +29,7 @@ var thesaurusCmd = &cobra.Command{ Short: "thesaurus prints the thesaurus with the specified name", Long: `The thesaurus command lets you print the thesaurus with the specified name.`, RunE: func(cmd *cobra.Command, args []string) error { - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats return fmt.Errorf("file format not supported") diff --git a/cmd/zap/cmd/vector.go b/cmd/zap/cmd/vector.go index ffa4dabd..ce16b2e3 100644 --- a/cmd/zap/cmd/vector.go +++ b/cmd/zap/cmd/vector.go @@ -40,7 +40,7 @@ var vectorCmd = &cobra.Command{ 4. reconstruct vector given the vectorID.`, RunE: func(cmd *cobra.Command, args []string) error { data := segment.Data() - pos := segment.FieldsIndexOffset() + pos := segment.SectionsIndexOffset() if pos == 0 { // this is the case only for older file formats diff --git a/merge.go b/merge.go index 6197af11..e0fcaf4c 100644 --- a/merge.go +++ b/merge.go @@ -81,9 +81,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat return nil, 0, err } - // passing the sectionsIndexOffset as fieldsIndexOffset and the docValueOffset as 0 for the footer - err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, sectionsIndexOffset, - 0, chunkMode, cr.Sum32(), cr) + err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, chunkMode, cr.Sum32(), cr) if err != nil { cleanup() return nil, 0, err diff --git a/segment.go b/segment.go index 5f3dcf99..38e3b3a1 100644 --- a/segment.go +++ b/segment.go @@ -101,16 +101,15 @@ type SegmentBase struct { fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address numDocs uint64 storedIndexOffset uint64 - fieldsIndexOffset uint64 sectionsIndexOffset uint64 - docValueOffset uint64 fieldDvReaders []map[uint16]*docValueReader // naive chunk cache per field; section->field->reader fieldDvNames []string // field names cached in fieldDvReaders size uint64 + // index update specific tracking updatedFields map[string]*index.UpdateFieldInfo - // section-specific cache + // section-specific caches invIndexCache *invertedIndexCache vecIndexCache *vectorIndexCache synIndexCache *synonymIndexCache @@ -203,41 +202,39 @@ func (s *Segment) DecRef() (err error) { } func (s *Segment) loadConfig() error { + // read offsets of 32 bit values - crc, ver, chunk crcOffset := len(s.mm) - 4 + verOffset := crcOffset - 4 + chunkOffset := verOffset - 4 + + // read offsets of 64 bit values - sectionsIndexOffset, storedIndexOffset, numDocsOffset + sectionsIndexOffset := chunkOffset - 8 + storedIndexOffset := sectionsIndexOffset - 8 + numDocsOffset := storedIndexOffset - 8 + + // read 32-bit crc s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4]) - verOffset := crcOffset - 4 + // read 32-bit version s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) if s.version != Version { return fmt.Errorf("unsupported version %d != %d", s.version, Version) } - chunkOffset := verOffset - 4 + // read 32-bit chunk mode s.chunkMode = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4]) - docValueOffset := chunkOffset - 8 - s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8]) + // read 64-bit sections index offset + s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[sectionsIndexOffset : sectionsIndexOffset+8]) - fieldsIndexOffset := docValueOffset - 8 - - // determining the right footer size based on version, this becomes important - // while loading the fields portion or the sections portion of the index file. - s.sectionsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) - fieldsIndexOffset = fieldsIndexOffset - 8 - footerSize := FooterSize - - s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) - - storedIndexOffset := fieldsIndexOffset - 8 + // read 64-bit stored index offset s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8]) - numDocsOffset := storedIndexOffset - 8 + // read 64-bit num docs s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8]) - // 8*4 + 4*3 = 44 bytes being accounted from all the offsets - // above being read from the file - s.incrementBytesRead(uint64(footerSize)) - s.SegmentBase.mem = s.mm[:len(s.mm)-footerSize] + s.incrementBytesRead(uint64(FooterSize)) + s.SegmentBase.mem = s.mm[:len(s.mm)-FooterSize] return nil } @@ -668,9 +665,9 @@ func (s *Segment) ChunkMode() uint32 { return s.chunkMode } -// FieldsIndexOffset returns the fields index offset in the file footer -func (s *Segment) FieldsIndexOffset() uint64 { - return s.fieldsIndexOffset +// SectionsIndexOffset returns the sections index offset in the file footer +func (s *Segment) SectionsIndexOffset() uint64 { + return s.sectionsIndexOffset } // StoredIndexOffset returns the stored value index offset in the file footer @@ -678,11 +675,6 @@ func (s *Segment) StoredIndexOffset() uint64 { return s.storedIndexOffset } -// DocValueOffset returns the docValue offset in the file footer -func (s *Segment) DocValueOffset() uint64 { - return s.docValueOffset -} - // NumDocs returns the number of documents in the file footer func (s *Segment) NumDocs() uint64 { return s.numDocs diff --git a/write.go b/write.go index 4e7f5523..39382d05 100644 --- a/write.go +++ b/write.go @@ -104,12 +104,11 @@ func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int } // FooterSize is the size of the footer record in bytes -// crc + ver + chunk + docValueOffset + sectionsIndexOffset + field offset + stored offset + num docs -const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8 + 8 +// crc + ver + chunk + sectionsIndexOffset + stored offset + num docs +const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 -// in the index sections format, the fieldsIndexOffset points to the sectionsIndexOffset -func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, sectionsIndexOffset, docValueOffset uint64, - chunkMode uint32, crcBeforeFooter uint32, writerIn io.Writer) error { +func persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset uint64, + chunkMode, crcBeforeFooter uint32, writerIn io.Writer) error { w := NewCountHashWriter(writerIn) w.crc = crcBeforeFooter @@ -118,38 +117,31 @@ func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, sectionsIndexO if err != nil { return err } + // write out the stored field index location: err = binary.Write(w, binary.BigEndian, storedIndexOffset) if err != nil { return err } - // write out the field index location - err = binary.Write(w, binary.BigEndian, fieldsIndexOffset) - if err != nil { - return err - } - // write out the new field index location (to be removed later, as this can eventually replace the old) + // write out the sections index location err = binary.Write(w, binary.BigEndian, sectionsIndexOffset) if err != nil { return err } - // write out the fieldDocValue location - err = binary.Write(w, binary.BigEndian, docValueOffset) - if err != nil { - return err - } // write out 32-bit chunk factor err = binary.Write(w, binary.BigEndian, chunkMode) if err != nil { return err } + // write out 32-bit version err = binary.Write(w, binary.BigEndian, Version) if err != nil { return err } + // write out CRC-32 of everything upto but not including this CRC err = binary.Write(w, binary.BigEndian, w.crc) if err != nil { diff --git a/zap.md b/zap.md index 125cadb1..c9dc529d 100644 --- a/zap.md +++ b/zap.md @@ -36,21 +36,21 @@ Footer section describes the configuration of particular ZAP file. The format of footer is version-dependent, so it is necessary to check `V` field before the parsing. - +==================================================+ - | Stored Fields | - |==================================================| - +-----> | Stored Fields Index | - | |==================================================| - | | Inverted Text Index Section | - | |==================================================| - | | Vector Index Section | - | |==================================================| - | | Sections Info | - | |==================================================| - | +-> | Sections Index | - | | |========+========+====+=====+======+====+====+====| - | | | D# | SF | F | S | FDV | CF | V | CC | (Footer) - | | +========+====+===+====+==+==+======+====+====+====+ + +===================================================+ + | Stored Fields | + |===================================================| + +-----> | Stored Fields Index | + | |===================================================| + | | Inverted Text Index Section | + | |===================================================| + | | Vector Index Section | + | |===================================================| + | | Sections Info | + | |===================================================| + | +-> | Sections Index | + | | |========+========+=======+========+=======+========| + | | | D# | SF | S | CF | V | CC | (Footer) + | | +========+========+=======+========+=======+========+ | | | | +---------------------+ | |-----------------------------+ @@ -58,9 +58,7 @@ Footer section describes the configuration of particular ZAP file. The format of D#. Number of Docs. SF. Stored Fields Index Offset. - F. Field Index Offset. S. Sections Index Offset - FDV. Field DocValue Offset. CF. Chunk Factor. V. Version. CC. CRC32. @@ -124,7 +122,7 @@ In case of inverted text index, the dictionary is encoded in [Vellum](https://gi | | | Freq/Norm (chunked) | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | - | +->[ Freq | Norm (float32 under varint) ] | + | +->[ Freq | Norm (float32 under varint) ] | | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | | | | | +------------------------------------------------------------+ | From 2e1faac2d498afc01dd5fe1fd4d3814a12232288 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 01:37:50 +0530 Subject: [PATCH 07/18] small fix in zap.md --- zap.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zap.md b/zap.md index c9dc529d..843a4ad0 100644 --- a/zap.md +++ b/zap.md @@ -122,7 +122,7 @@ In case of inverted text index, the dictionary is encoded in [Vellum](https://gi | | | Freq/Norm (chunked) | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | - | +->[ Freq | Norm (float32 under varint) ] | + | +->[ Freq | Norm (float32 under varint) ] | | | [~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~] | | | | | +------------------------------------------------------------+ | From 6b3309d1a3a227b82c57e0a373c76a5c240c6898 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 14:38:58 +0530 Subject: [PATCH 08/18] cleanup --- section_faiss_vector_index.go | 1 + section_inverted_text_index.go | 60 ++++++++++++++++++++++++---------- section_synonym_index.go | 12 ++++--- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index ce16756c..7928a8a0 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -773,6 +773,7 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.vecFieldMap = nil v.vecIDMap = nil v.tmp0 = v.tmp0[:0] + v.updatedFields = nil atomic.StoreUint64(&v.bytesWritten, 0) diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 0862b2e2..98316404 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -747,29 +747,48 @@ func (io *invertedIndexOpaque) process(field index.Field, fieldID uint16, docNum } } -func (i *invertedIndexOpaque) realloc() { - var pidNext int +func (i *invertedIndexOpaque) initDictsAndKeysFromFields() { + numFields := len(i.FieldsInv) - var totTFs int - var totLocs int - i.FieldsMap = map[string]uint16{} - - i.getOrDefineField("_id") // _id field is fieldID 0 + // Resize or allocate Dicts + if cap(i.Dicts) >= numFields { + i.Dicts = i.Dicts[:numFields] + } else { + i.Dicts = make([]map[string]uint64, numFields) + } - for _, result := range i.results { - result.VisitComposite(func(field index.CompositeField) { - i.getOrDefineField(field.Name()) - }) - result.VisitFields(func(field index.Field) { - i.getOrDefineField(field.Name()) - }) + // Resize or allocate DictKeys + if cap(i.DictKeys) >= numFields { + i.DictKeys = i.DictKeys[:numFields] + } else { + i.DictKeys = make([][]string, numFields) } - sort.Strings(i.FieldsInv[1:]) // keep _id as first field + for idx := 0; idx < numFields; idx++ { + // --- Dicts --- + if i.Dicts[idx] == nil { + i.Dicts[idx] = make(map[string]uint64) + } else { + clear(i.Dicts[idx]) + } - for fieldID, fieldName := range i.FieldsInv { - i.FieldsMap[fieldName] = uint16(fieldID + 1) + // --- DictKeys --- + if i.DictKeys[idx] != nil { + i.DictKeys[idx] = i.DictKeys[idx][:0] + } else { + i.DictKeys[idx] = nil + } } +} + +func (i *invertedIndexOpaque) realloc() { + var pidNext int + + var totTFs int + var totLocs int + + // initialize dicts and dict keys from fieldsMap + i.initDictsAndKeysFromFields() visitField := func(field index.Field, docNum int) { fieldID := uint16(i.getOrDefineField(field.Name())) @@ -935,7 +954,7 @@ func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int { func (i *invertedTextIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &invertedIndexOpaque{ fieldAddrs: map[int]int{}, - updatedFields: make(map[string]*index.UpdateFieldInfo), + updatedFields: map[string]*index.UpdateFieldInfo{}, } for k, v := range args { rv.Set(k, v) @@ -1059,6 +1078,9 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.fieldsSame = false io.numDocs = 0 + clear(io.fieldAddrs) + clear(io.updatedFields) + return err } func (i *invertedIndexOpaque) Set(key string, val interface{}) { @@ -1071,6 +1093,8 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) { i.fieldsSame = val.(bool) case "fieldsMap": i.FieldsMap = val.(map[string]uint16) + case "fieldsInv": + i.FieldsInv = val.([]string) case "numDocs": i.numDocs = val.(uint64) case "updatedFields": diff --git a/section_synonym_index.go b/section_synonym_index.go index 3894d1ae..51df3f40 100644 --- a/section_synonym_index.go +++ b/section_synonym_index.go @@ -116,7 +116,8 @@ func (so *synonymIndexOpaque) Reset() (err error) { // cleanup stuff over here so.results = nil so.init = false - so.ThesaurusMap = nil + so.FieldsMap = nil + clear(so.ThesaurusMap) so.ThesaurusInv = so.ThesaurusInv[:0] for i := range so.Thesauri { so.Thesauri[i] = nil @@ -134,9 +135,10 @@ func (so *synonymIndexOpaque) Reset() (err error) { if so.builder != nil { err = so.builder.Reset(&so.builderBuf) } - so.FieldIDtoThesaurusID = nil + clear(so.FieldIDtoThesaurusID) so.SynonymTermToID = so.SynonymTermToID[:0] so.SynonymIDtoTerm = so.SynonymIDtoTerm[:0] + clear(so.thesaurusAddrs) so.tmp0 = so.tmp0[:0] return err @@ -176,8 +178,6 @@ func (so *synonymIndexOpaque) process(field index.SynonymField, fieldID uint16, func (so *synonymIndexOpaque) realloc() { var pidNext int var sidNext uint32 - so.ThesaurusMap = map[string]uint16{} - so.FieldIDtoThesaurusID = map[uint16]int{} // count the number of unique thesauri from the batch of documents for _, result := range so.results { @@ -398,7 +398,9 @@ func (s *synonymIndexSection) getSynonymIndexOpaque(opaque map[int]resetable) *s // results in the opaque before the section processes a synonym field. func (s *synonymIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &synonymIndexOpaque{ - thesaurusAddrs: map[int]int{}, + ThesaurusMap: map[string]uint16{}, + FieldIDtoThesaurusID: map[uint16]int{}, + thesaurusAddrs: map[int]int{}, } for k, v := range args { rv.Set(k, v) From d34dd949c6a418fdad1335adfb77ca6587f72222 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 15:52:51 +0530 Subject: [PATCH 09/18] Add fieldsOptions to manage indexing options for fields --- build.go | 5 +++-- merge.go | 25 +++++++++++++++++++++---- new.go | 27 +++++++++++++++++++++------ segment.go | 18 +++++++++++++++--- write.go | 10 +++++++++- 5 files changed, 69 insertions(+), 16 deletions(-) diff --git a/build.go b/build.go index ee3c3b09..1b30a732 100644 --- a/build.go +++ b/build.go @@ -171,8 +171,9 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), // following fields gets populated by loadFields - fieldsMap: make(map[string]uint16), - fieldsInv: make([]string, 0), + fieldsMap: make(map[string]uint16), + fieldsOptions: make(map[string]index.FieldIndexingOptions), + fieldsInv: make([]string, 0), } sb.updateSize() diff --git a/merge.go b/merge.go index e0fcaf4c..73210fed 100644 --- a/merge.go +++ b/merge.go @@ -121,6 +121,17 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf return fieldsInv[:idx] } +// Remove field options for fields that have been completely deleted +func filterFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, + fieldsMap map[string]uint16) map[string]index.FieldIndexingOptions { + for field := range fieldOptions { + if _, ok := fieldsMap[field]; !ok { + delete(fieldOptions, field) + } + } + return fieldOptions +} + func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( newDocNums [][]uint64, numDocs, storedIndexOffset uint64, @@ -128,10 +139,12 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, err error) { var fieldsSame bool - fieldsSame, fieldsInv = mergeFields(segments) + var fieldsOptions map[string]index.FieldIndexingOptions + fieldsSame, fieldsInv, fieldsOptions = mergeFields(segments) updatedFields := mergeUpdatedFields(segments) fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) + fieldsOptions = filterFieldOptions(fieldsOptions, fieldsMap) numDocs = computeNewDocCount(segments, drops) @@ -149,6 +162,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, "fieldsMap": fieldsMap, "numDocs": numDocs, "updatedFields": updatedFields, + "fieldsOptions": fieldsOptions, } if numDocs > 0 { @@ -171,7 +185,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, // we can persist the fields section index now, this will point // to the various indexes (each in different section) available for a field. - sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque) + sectionsIndexOffset, err = persistFieldsSection(fieldsInv, fieldsOptions, cr, mergeOpaque) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -597,7 +611,7 @@ func (sb *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint6 // input segments, and computes whether the fields are the same across // segments (which depends on fields to be sorted in the same way // across segments) -func mergeFields(segments []*SegmentBase) (bool, []string) { +func mergeFields(segments []*SegmentBase) (bool, []string, map[string]index.FieldIndexingOptions) { fieldsSame := true var segment0Fields []string @@ -606,10 +620,13 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { } fieldsExist := map[string]struct{}{} + fieldOptions := map[string]index.FieldIndexingOptions{} for _, segment := range segments { fields := segment.Fields() for fieldi, field := range fields { fieldsExist[field] = struct{}{} + // record field options + fieldOptions[field] = segment.fieldsOptions[field] if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { fieldsSame = false } @@ -627,7 +644,7 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { sort.Strings(rv[1:]) // leave _id as first - return fieldsSame, rv + return fieldsSame, rv, fieldOptions } // Combine updateFieldInfo from all segments diff --git a/new.go b/new.go index c99b933d..ce80132c 100644 --- a/new.go +++ b/new.go @@ -102,6 +102,9 @@ type interim struct { // name -> field id + 1 FieldsMap map[string]uint16 + // FieldsOptions holds the indexing options for each field + FieldsOptions map[string]index.FieldIndexingOptions + // FieldsInv is the inverse of FieldsMap // field id -> name FieldsInv []string @@ -127,6 +130,9 @@ func (s *interim) reset() (err error) { for k := range s.FieldsMap { delete(s.FieldsMap, k) } + for k := range s.FieldsOptions { + delete(s.FieldsOptions, k) + } s.FieldsInv = s.FieldsInv[:0] s.metaBuf.Reset() s.tmp0 = s.tmp0[:0] @@ -173,15 +179,23 @@ func (s *interim) convert() (uint64, uint64, error) { if s.FieldsMap == nil { s.FieldsMap = map[string]uint16{} } + if s.FieldsOptions == nil { + s.FieldsOptions = map[string]index.FieldIndexingOptions{} + } s.getOrDefineField("_id") // _id field is fieldID 0 + var fName string for _, result := range s.results { result.VisitComposite(func(field index.CompositeField) { - s.getOrDefineField(field.Name()) + fName = field.Name() + s.getOrDefineField(fName) + s.FieldsOptions[fName] = field.Options() }) result.VisitFields(func(field index.Field) { + fName = field.Name() s.getOrDefineField(field.Name()) + s.FieldsOptions[fName] = field.Options() }) } @@ -192,10 +206,11 @@ func (s *interim) convert() (uint64, uint64, error) { } args := map[string]interface{}{ - "results": s.results, - "chunkMode": s.chunkMode, - "fieldsMap": s.FieldsMap, - "fieldsInv": s.FieldsInv, + "results": s.results, + "chunkMode": s.chunkMode, + "fieldsMap": s.FieldsMap, + "fieldsInv": s.FieldsInv, + "fieldsOptions": s.FieldsOptions, } if s.opaque == nil { s.opaque = map[int]resetable{} @@ -236,7 +251,7 @@ func (s *interim) convert() (uint64, uint64, error) { // we can persist a new fields section here // this new fields section will point to the various indexes available - sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque) + sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.FieldsOptions, s.w, s.opaque) if err != nil { return 0, 0, err } diff --git a/segment.go b/segment.go index 38e3b3a1..e2a6c698 100644 --- a/segment.go +++ b/segment.go @@ -54,6 +54,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { rv := &Segment{ SegmentBase: SegmentBase{ fieldsMap: make(map[string]uint16), + fieldsOptions: make(map[string]index.FieldIndexingOptions), invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), @@ -96,9 +97,10 @@ type SegmentBase struct { mem []byte memCRC uint32 chunkMode uint32 - fieldsMap map[string]uint16 // fieldName -> fieldID+1 - fieldsInv []string // fieldID -> fieldName - fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address + fieldsMap map[string]uint16 // fieldName -> fieldID+1 + fieldsOptions map[string]index.FieldIndexingOptions // fieldName -> fieldOptions + fieldsInv []string // fieldID -> fieldName + fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address numDocs uint64 storedIndexOffset uint64 sectionsIndexOffset uint64 @@ -128,6 +130,11 @@ func (sb *SegmentBase) updateSize() { sizeInBytes += (len(k) + SizeOfString) + SizeOfUint16 } + // fieldsOptions + for k := range sb.fieldsOptions { + sizeInBytes += (len(k) + SizeOfString) + SizeOfUint64 + } + // fieldsInv for _, entry := range sb.fieldsInv { sizeInBytes += len(entry) + SizeOfString @@ -341,8 +348,13 @@ func (sb *SegmentBase) loadField(fieldID uint16, pos uint64, fieldName := string(sb.mem[pos : pos+fieldNameLen]) pos += fieldNameLen + // read field options + fieldOptions, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(sz) + sb.fieldsInv = append(sb.fieldsInv, fieldName) sb.fieldsMap[fieldName] = uint16(fieldID + 1) + sb.fieldsOptions[fieldName] = index.FieldIndexingOptions(fieldOptions) fieldNumSections, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) diff --git a/write.go b/write.go index 39382d05..a3c049ea 100644 --- a/write.go +++ b/write.go @@ -19,6 +19,7 @@ import ( "io" "github.com/RoaringBitmap/roaring/v2" + index "github.com/blevesearch/bleve_index_api" ) // writes out the length of the roaring bitmap in bytes as varint @@ -50,7 +51,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, return tw, nil } -func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { +func persistFieldsSection(fieldsInv []string, fieldsOptions map[string]index.FieldIndexingOptions, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { var rv uint64 fieldsOffsets := make([]uint64, 0, len(fieldsInv)) @@ -70,6 +71,13 @@ func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int return 0, err } + // write out the field options + fieldOpts := fieldsOptions[fieldName] + _, err = writeUvarints(w, uint64(fieldOpts)) + if err != nil { + return 0, err + } + // write out the number of field-specific indexes // FIXME hard-coding to 2, and not attempting to support sparseness well _, err = writeUvarints(w, uint64(len(segmentSections))) From afd61d6632e28151146836c728144eaf4fb9fd78 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 16:18:31 +0530 Subject: [PATCH 10/18] Refactor field name retrieval --- new.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/new.go b/new.go index ce80132c..f4515d0f 100644 --- a/new.go +++ b/new.go @@ -194,7 +194,7 @@ func (s *interim) convert() (uint64, uint64, error) { }) result.VisitFields(func(field index.Field) { fName = field.Name() - s.getOrDefineField(field.Name()) + s.getOrDefineField(fName) s.FieldsOptions[fName] = field.Options() }) } From 395dbed3c75329505a11250a4210377d63e9f819 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 19:44:39 +0530 Subject: [PATCH 11/18] Refactor field indexing logic to utilize fieldsOptions over updatedFields --- merge.go | 53 ++++++++++++++++++++++------------ section_faiss_vector_index.go | 20 +++++++------ section_inverted_text_index.go | 33 +++++++++++---------- 3 files changed, 62 insertions(+), 44 deletions(-) diff --git a/merge.go b/merge.go index 73210fed..fadee957 100644 --- a/merge.go +++ b/merge.go @@ -145,6 +145,10 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) fieldsOptions = filterFieldOptions(fieldsOptions, fieldsMap) + // fieldsSame cannot be true if fields were deleted + if len(updatedFields) > 0 { + fieldsSame = false + } numDocs = computeNewDocCount(segments, drops) @@ -161,13 +165,12 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, "fieldsSame": fieldsSame, "fieldsMap": fieldsMap, "numDocs": numDocs, - "updatedFields": updatedFields, "fieldsOptions": fieldsOptions, } if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields) + fieldsMap, fieldsInv, fieldsOptions, fieldsSame, numDocs, cr, closeCh) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -386,8 +389,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo type varintEncoder func(uint64) (int, error) func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, - w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) { + fieldsMap map[string]uint16, fieldsInv []string, + fieldsOptions map[string]index.FieldIndexingOptions, + fieldsSame bool, newSegDocCount uint64, + w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. var newDocNum uint64 @@ -427,7 +432,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer // cannot copy directly if fields might have been deleted - if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 { + if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) if err != nil { return 0, nil, err @@ -470,8 +475,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // no entry for field in fieldsMap return false } - // early exit if the stored portion of the field is deleted - if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store { + // early exit if the store is not wanted for this field + if !fieldsOptions[field].IsStored() { return true } vals[fieldID] = append(vals[fieldID], value) @@ -505,8 +510,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // now walk the non-"_id" fields in order for fieldID := 1; fieldID < len(fieldsInv); fieldID++ { - // early exit if the stored portion of the field is deleted - if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store { + // early exit if no stored values for this field + if len(vals[fieldID]) == 0 { continue } storedFieldValues := vals[fieldID] @@ -625,8 +630,21 @@ func mergeFields(segments []*SegmentBase) (bool, []string, map[string]index.Fiel fields := segment.Fields() for fieldi, field := range fields { fieldsExist[field] = struct{}{} - // record field options - fieldOptions[field] = segment.fieldsOptions[field] + + if prev, ok := fieldOptions[field]; ok { + // Merge options conservatively: once a field option is disabled (bit cleared) + // in any segment, it remains disabled. This ensures deterministic behavior + // when options can only transition from true -> false. + fieldOptions[field] = prev & segment.fieldsOptions[field] + // check if any bits were cleared + if fieldOptions[field] != prev { + // Some bits were cleared (option changed from true -> false) + fieldsSame = false + } + } else { + // first occurrence of the field + fieldOptions[field] = segment.fieldsOptions[field] + } if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { fieldsSame = false } @@ -656,18 +674,15 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn if fieldInfo == nil { fieldInfo = make(map[string]*index.UpdateFieldInfo) } + // if field not present, add it if _, ok := fieldInfo[field]; !ok { fieldInfo[field] = &index.UpdateFieldInfo{ - Deleted: info.Deleted, - Index: info.Index, - Store: info.Store, - DocValues: info.DocValues, + // mark whether field is deleted in any segment + Deleted: info.Deleted, } } else { - fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted - fieldInfo[field].Index = fieldInfo[field].Index || info.Index - fieldInfo[field].Store = fieldInfo[field].Store || info.Store - fieldInfo[field].DocValues = fieldInfo[field].Store || info.DocValues + // in case of conflict (mark field deleted if latest segment marks it deleted) + fieldInfo[field].Deleted = info.Deleted } } diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 7928a8a0..c53a3b7a 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -105,8 +105,8 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if _, ok := sb.fieldsMap[fieldName]; !ok { continue } - // early exit if index data is supposed to be deleted - if info, ok := vo.updatedFields[fieldName]; ok && info.Index { + // early exit if field is is not required to be indexed + if !vo.fieldsOptions[fieldName].IsIndexed() { continue } @@ -690,10 +690,9 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable) func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &vectorIndexOpaque{ - fieldAddrs: make(map[uint16]int), - vecIDMap: make(map[int64]*vecInfo), - vecFieldMap: make(map[uint16]*indexContent), - updatedFields: make(map[string]*index.UpdateFieldInfo), + fieldAddrs: make(map[uint16]int), + vecIDMap: make(map[int64]*vecInfo), + vecFieldMap: make(map[uint16]*indexContent), } for k, v := range args { rv.Set(k, v) @@ -732,7 +731,8 @@ type vectorIndexOpaque struct { // index to be build. vecFieldMap map[uint16]*indexContent - updatedFields map[string]*index.UpdateFieldInfo + // field indexing options + fieldsOptions map[string]index.FieldIndexingOptions tmp0 []byte } @@ -775,6 +775,8 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.tmp0 = v.tmp0[:0] v.updatedFields = nil + v.fieldsOptions = nil + atomic.StoreUint64(&v.bytesWritten, 0) return nil @@ -782,7 +784,7 @@ func (v *vectorIndexOpaque) Reset() (err error) { func (v *vectorIndexOpaque) Set(key string, val interface{}) { switch key { - case "updatedFields": - v.updatedFields = val.(map[string]*index.UpdateFieldInfo) + case "fieldsOptions": + v.fieldsOptions = val.(map[string]index.FieldIndexingOptions) } } diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 98316404..b6d6fd6c 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -80,9 +80,8 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI } func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool, - newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, - updatedFields map[string]*index.UpdateFieldInfo, w *CountHashWriter, + fieldsInv []string, fieldsMap map[string]uint16, fieldsOptions map[string]index.FieldIndexingOptions, + fieldsSame bool, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, w *CountHashWriter, closeCh chan struct{}) (map[int]int, uint64, error) { var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 @@ -126,8 +125,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - // early exit if index data is supposed to be deleted - if info, ok := updatedFields[fieldName]; ok && info.Index { + // early exit if the field's index option is false + if !fieldsOptions[fieldName].IsIndexed() { continue } @@ -249,8 +248,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. postItr = postings.iterator(true, true, true, postItr) - // can only safely copy data if no field data has been deleted - if fieldsSame && len(updatedFields) == 0 { + // can only safely copy data if all segments have same fields + if fieldsSame { // can optimize by copying freq/norm/loc bytes directly lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying( term, postItr, newDocNums[itrI], newRoaring, @@ -323,8 +322,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - // early exit if docvalues data is supposed to be deleted - if info, ok := updatedFields[fieldName]; ok && info.DocValues { + // early exit if docvalues are not wanted for this field + if !fieldsOptions[fieldName].IncludeDocValues() { continue } fieldIDPlus1 := uint16(segment.fieldsMap[fieldName]) @@ -407,7 +406,7 @@ func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*S w *CountHashWriter, closeCh chan struct{}) error { io := i.getInvertedIndexOpaque(opaque) fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv, - io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh) + io.FieldsMap, io.FieldsOptions, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh) if err != nil { return err } @@ -953,8 +952,7 @@ func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int { func (i *invertedTextIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &invertedIndexOpaque{ - fieldAddrs: map[int]int{}, - updatedFields: map[string]*index.UpdateFieldInfo{}, + fieldAddrs: map[int]int{}, } for k, v := range args { rv.Set(k, v) @@ -981,6 +979,10 @@ type invertedIndexOpaque struct { // field id -> name FieldsInv []string + // Field indexing options + // field name -> options + FieldsOptions map[string]index.FieldIndexingOptions + // Term dictionaries for each field // field id -> term -> postings list id + 1 Dicts []map[string]uint64 @@ -1023,8 +1025,6 @@ type invertedIndexOpaque struct { fieldAddrs map[int]int - updatedFields map[string]*index.UpdateFieldInfo - fieldsSame bool numDocs uint64 } @@ -1035,6 +1035,7 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.init = false io.chunkMode = 0 io.FieldsMap = nil + io.FieldsOptions = nil io.FieldsInv = nil for i := range io.Dicts { io.Dicts[i] = nil @@ -1093,11 +1094,11 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) { i.fieldsSame = val.(bool) case "fieldsMap": i.FieldsMap = val.(map[string]uint16) + case "fieldsOptions": + i.FieldsOptions = val.(map[string]index.FieldIndexingOptions) case "fieldsInv": i.FieldsInv = val.([]string) case "numDocs": i.numDocs = val.(uint64) - case "updatedFields": - i.updatedFields = val.(map[string]*index.UpdateFieldInfo) } } From f1a3e7763253b8bef9d439be976f76b941c09a4e Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 12 Nov 2025 21:08:48 +0530 Subject: [PATCH 12/18] md changes --- zap.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/zap.md b/zap.md index 843a4ad0..aa1afda0 100644 --- a/zap.md +++ b/zap.md @@ -99,16 +99,17 @@ Sections Index is a set of NF uint64 addresses (0 through F# - 1) each of which + Sections Info + Sections Index + |============================================================================|=====================================| | | | - | +---------+---------+-----+---------+---------+~~~~~~~~+~~~~~~~~+--+...+-+ | +-------+--------+...+------+-----+ | - +----> S1 Addr | S1 Type | ... | Sn Addr | Sn Type | NS | Length | Name | | | 0 | 1 | | F#-1 | NF | | - | | +---------+---------+-----+---------+---------+~~~~~~~~+~~~~~~~~+--+...+-+ | +-------+----+---+...+------+-----+ | + | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+--------+...+------+-----+ | + +----> S1 Addr | S1 Type | ... | Sn Addr | Sn Type | NS | O | Name | Length | | | 0 | 1 | | F#-1 | NF | | + | | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+----+---+...+------+-----+ | | | | | | | +============================================================================+==============|======================+ | | - +----------------------------------------------------------------------------------------------+ + +----------------------------------------------------,------------------------------------------+ NF. Number of fields NS. Number of index sections + O. Field Indexing Options Sn. nth index section ## Inverted Text Index Section From b1a32aa7f1b1f5054f61bd354d8823349e32b093 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 01:58:40 +0530 Subject: [PATCH 13/18] pass test --- merge.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/merge.go b/merge.go index fadee957..287fe1ed 100644 --- a/merge.go +++ b/merge.go @@ -121,12 +121,33 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf return fieldsInv[:idx] } -// Remove field options for fields that have been completely deleted -func filterFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, - fieldsMap map[string]uint16) map[string]index.FieldIndexingOptions { - for field := range fieldOptions { - if _, ok := fieldsMap[field]; !ok { - delete(fieldOptions, field) +// Remove field options for fields using updateFieldInfo to override the +// options selected during mergeFields, if needed. This is mainly +// for the case where there is a field option update which has not been +// propogated yet, because a new segment has not been created yet. +func finalizeFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, + updatedFields map[string]*index.UpdateFieldInfo) map[string]index.FieldIndexingOptions { + for field, opts := range fieldOptions { + if info, ok := updatedFields[field]; ok { + // if field is deleted, remove its options + if info.Deleted { + delete(fieldOptions, field) + continue + } + // otherwise, update options based on info + if info.Index { + // ensure indexing is disabled + opts &^= index.IndexField + } + if info.Store { + // ensure storing is disabled + opts &^= index.StoreField + } + if info.DocValues { + // ensure doc values is disabled + opts &^= index.DocValues + } + fieldOptions[field] = opts } } return fieldOptions @@ -144,7 +165,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, updatedFields := mergeUpdatedFields(segments) fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) - fieldsOptions = filterFieldOptions(fieldsOptions, fieldsMap) + fieldsOptions = finalizeFieldOptions(fieldsOptions, updatedFields) // fieldsSame cannot be true if fields were deleted if len(updatedFields) > 0 { fieldsSame = false @@ -678,11 +699,16 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn if _, ok := fieldInfo[field]; !ok { fieldInfo[field] = &index.UpdateFieldInfo{ // mark whether field is deleted in any segment - Deleted: info.Deleted, + Deleted: info.Deleted, + Index: info.Index, + Store: info.Store, + DocValues: info.DocValues, } } else { - // in case of conflict (mark field deleted if latest segment marks it deleted) - fieldInfo[field].Deleted = info.Deleted + fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted + fieldInfo[field].Index = fieldInfo[field].Index || info.Index + fieldInfo[field].Store = fieldInfo[field].Store || info.Store + fieldInfo[field].DocValues = fieldInfo[field].DocValues || info.DocValues } } From 46d5600f73c14f4c2d81181556f23306b4b277ac Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 13 Nov 2025 14:45:28 +0530 Subject: [PATCH 14/18] fix merge conflict --- section_faiss_vector_index.go | 1 - section_inverted_text_index.go | 1 - 2 files changed, 2 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index c53a3b7a..8610563b 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -773,7 +773,6 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.vecFieldMap = nil v.vecIDMap = nil v.tmp0 = v.tmp0[:0] - v.updatedFields = nil v.fieldsOptions = nil diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index b6d6fd6c..2d943f87 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -1080,7 +1080,6 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.numDocs = 0 clear(io.fieldAddrs) - clear(io.updatedFields) return err } From 13cd3e32073d8154499b70eb473de2bef65677c5 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 19 Nov 2025 17:24:20 +0530 Subject: [PATCH 15/18] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- merge.go | 2 +- section_faiss_vector_index.go | 1 - zap.md | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/merge.go b/merge.go index 287fe1ed..f3c8e481 100644 --- a/merge.go +++ b/merge.go @@ -124,7 +124,7 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf // Remove field options for fields using updateFieldInfo to override the // options selected during mergeFields, if needed. This is mainly // for the case where there is a field option update which has not been -// propogated yet, because a new segment has not been created yet. +// propagated yet, because a new segment has not been created yet. func finalizeFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, updatedFields map[string]*index.UpdateFieldInfo) map[string]index.FieldIndexingOptions { for field, opts := range fieldOptions { diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index c53a3b7a..8610563b 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -773,7 +773,6 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.vecFieldMap = nil v.vecIDMap = nil v.tmp0 = v.tmp0[:0] - v.updatedFields = nil v.fieldsOptions = nil diff --git a/zap.md b/zap.md index aa1afda0..e0495992 100644 --- a/zap.md +++ b/zap.md @@ -100,7 +100,7 @@ Sections Index is a set of NF uint64 addresses (0 through F# - 1) each of which |============================================================================|=====================================| | | | | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+--------+...+------+-----+ | - +----> S1 Addr | S1 Type | ... | Sn Addr | Sn Type | NS | O | Name | Length | | | 0 | 1 | | F#-1 | NF | | + +----> Length | Name | O | NS | S1 Type | S1 Addr | ... | Sn Type | Sn Addr | | | 0 | 1 | | F#-1 | NF | | | | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+----+---+...+------+-----+ | | | | | | | +============================================================================+==============|======================+ From e54f190cb261f2bcd1c1d81690660e9f2eaed1b0 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 19 Nov 2025 17:30:05 +0530 Subject: [PATCH 16/18] fix md --- zap.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/zap.md b/zap.md index e0495992..a7d308af 100644 --- a/zap.md +++ b/zap.md @@ -95,17 +95,17 @@ Stored Fields Data is an arbitrary size record, which consists of metadata and [ Sections Index is a set of NF uint64 addresses (0 through F# - 1) each of which are offsets to the records in the Sections Info. Inside the sections info, we have further offsets to specific type of index section for that particular field in the segment file. For example, field 0 may correspond to Vector Indexing and its records would have offsets to the Vector Index Section whereas a field 1 may correspond to Text Indexing and its records would rather point to somewhere within the Inverted Text Index Section. - (...) [F] [F + F#] - + Sections Info + Sections Index + - |============================================================================|=====================================| - | | | - | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+--------+...+------+-----+ | - +----> Length | Name | O | NS | S1 Type | S1 Addr | ... | Sn Type | Sn Addr | | | 0 | 1 | | F#-1 | NF | | - | | +---------+---------+-----+---------+---------+~~~~~~~~+-+...+-+~~~~~~~~+ | +-------+----+---+...+------+-----+ | - | | | | | - | +============================================================================+==============|======================+ - | | - +----------------------------------------------------,------------------------------------------+ + (...) [F] [F + F#] + + Sections Info + Sections Index + + |===========================================================================|=================================| + | | | + | +--------+------+---+----+---------+---------+~~~~~+--+...+--+~~~~~~~~~+ | +------+------+...+------+----+ | + +---->| Length | Name | O | NS | S1 Type | S1 Addr | ... | Sn Type | Sn Addr | | | 0 | 1 | | F#-1 | NF | | + | | +--------+------+---+----+---------+---------+~~~~~+--+...+--+~~~~~~~~~+ | +------+----+-+...+------+----+ | + | | | | | + | +===========================================================================+=============|===================+ + | | + +--------------------------------------------------------------------------------------------+ NF. Number of fields NS. Number of index sections From b57dc7d4bf9f334c14efb047f920d029fc73f569 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 19 Nov 2025 17:31:03 +0530 Subject: [PATCH 17/18] Update section_faiss_vector_index.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- section_faiss_vector_index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 8610563b..3a6e8fc0 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -105,7 +105,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if _, ok := sb.fieldsMap[fieldName]; !ok { continue } - // early exit if field is is not required to be indexed + // early exit if field is not required to be indexed if !vo.fieldsOptions[fieldName].IsIndexed() { continue } From e2e53bd27bb50f0d42e65963e4df2d32f674341f Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Thu, 20 Nov 2025 11:17:18 +0530 Subject: [PATCH 18/18] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- merge.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/merge.go b/merge.go index f3c8e481..bfbe20c2 100644 --- a/merge.go +++ b/merge.go @@ -121,10 +121,10 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf return fieldsInv[:idx] } -// Remove field options for fields using updateFieldInfo to override the -// options selected during mergeFields, if needed. This is mainly -// for the case where there is a field option update which has not been -// propagated yet, because a new segment has not been created yet. +// Update field options using updateFieldInfo to override the options +// selected during mergeFields, if needed. This includes removing field +// options for deleted fields and updating options for fields with changes +// that have not yet been propagated because a new segment has not been created. func finalizeFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, updatedFields map[string]*index.UpdateFieldInfo) map[string]index.FieldIndexingOptions { for field, opts := range fieldOptions { @@ -531,6 +531,10 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // now walk the non-"_id" fields in order for fieldID := 1; fieldID < len(fieldsInv); fieldID++ { + // early exit if the store is not wanted for this field + if !fieldsOptions[fieldsInv[fieldID]].IsStored() { + continue + } // early exit if no stored values for this field if len(vals[fieldID]) == 0 { continue