diff --git a/build.go b/build.go index ee3c3b09..1b30a732 100644 --- a/build.go +++ b/build.go @@ -171,8 +171,9 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), // following fields gets populated by loadFields - fieldsMap: make(map[string]uint16), - fieldsInv: make([]string, 0), + fieldsMap: make(map[string]uint16), + fieldsOptions: make(map[string]index.FieldIndexingOptions), + fieldsInv: make([]string, 0), } sb.updateSize() diff --git a/merge.go b/merge.go index e0fcaf4c..bfbe20c2 100644 --- a/merge.go +++ b/merge.go @@ -121,6 +121,38 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf return fieldsInv[:idx] } +// Update field options using updateFieldInfo to override the options +// selected during mergeFields, if needed. This includes removing field +// options for deleted fields and updating options for fields with changes +// that have not yet been propagated because a new segment has not been created. +func finalizeFieldOptions(fieldOptions map[string]index.FieldIndexingOptions, + updatedFields map[string]*index.UpdateFieldInfo) map[string]index.FieldIndexingOptions { + for field, opts := range fieldOptions { + if info, ok := updatedFields[field]; ok { + // if field is deleted, remove its options + if info.Deleted { + delete(fieldOptions, field) + continue + } + // otherwise, update options based on info + if info.Index { + // ensure indexing is disabled + opts &^= index.IndexField + } + if info.Store { + // ensure storing is disabled + opts &^= index.StoreField + } + if info.DocValues { + // ensure doc values is disabled + opts &^= index.DocValues + } + fieldOptions[field] = opts + } + } + return fieldOptions +} + func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( newDocNums [][]uint64, numDocs, storedIndexOffset uint64, @@ -128,10 +160,16 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, err error) { var fieldsSame bool - fieldsSame, fieldsInv = mergeFields(segments) + var fieldsOptions map[string]index.FieldIndexingOptions + fieldsSame, fieldsInv, fieldsOptions = mergeFields(segments) updatedFields := mergeUpdatedFields(segments) fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) + fieldsOptions = finalizeFieldOptions(fieldsOptions, updatedFields) + // fieldsSame cannot be true if fields were deleted + if len(updatedFields) > 0 { + fieldsSame = false + } numDocs = computeNewDocCount(segments, drops) @@ -148,12 +186,12 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, "fieldsSame": fieldsSame, "fieldsMap": fieldsMap, "numDocs": numDocs, - "updatedFields": updatedFields, + "fieldsOptions": fieldsOptions, } if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields) + fieldsMap, fieldsInv, fieldsOptions, fieldsSame, numDocs, cr, closeCh) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -171,7 +209,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, // we can persist the fields section index now, this will point // to the various indexes (each in different section) available for a field. - sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque) + sectionsIndexOffset, err = persistFieldsSection(fieldsInv, fieldsOptions, cr, mergeOpaque) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -372,8 +410,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo type varintEncoder func(uint64) (int, error) func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, - w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) { + fieldsMap map[string]uint16, fieldsInv []string, + fieldsOptions map[string]index.FieldIndexingOptions, + fieldsSame bool, newSegDocCount uint64, + w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. var newDocNum uint64 @@ -413,7 +453,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer // cannot copy directly if fields might have been deleted - if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 { + if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) if err != nil { return 0, nil, err @@ -456,8 +496,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // no entry for field in fieldsMap return false } - // early exit if the stored portion of the field is deleted - if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store { + // early exit if the store is not wanted for this field + if !fieldsOptions[field].IsStored() { return true } vals[fieldID] = append(vals[fieldID], value) @@ -491,8 +531,12 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // now walk the non-"_id" fields in order for fieldID := 1; fieldID < len(fieldsInv); fieldID++ { - // early exit if the stored portion of the field is deleted - if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store { + // early exit if the store is not wanted for this field + if !fieldsOptions[fieldsInv[fieldID]].IsStored() { + continue + } + // early exit if no stored values for this field + if len(vals[fieldID]) == 0 { continue } storedFieldValues := vals[fieldID] @@ -597,7 +641,7 @@ func (sb *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint6 // input segments, and computes whether the fields are the same across // segments (which depends on fields to be sorted in the same way // across segments) -func mergeFields(segments []*SegmentBase) (bool, []string) { +func mergeFields(segments []*SegmentBase) (bool, []string, map[string]index.FieldIndexingOptions) { fieldsSame := true var segment0Fields []string @@ -606,10 +650,26 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { } fieldsExist := map[string]struct{}{} + fieldOptions := map[string]index.FieldIndexingOptions{} for _, segment := range segments { fields := segment.Fields() for fieldi, field := range fields { fieldsExist[field] = struct{}{} + + if prev, ok := fieldOptions[field]; ok { + // Merge options conservatively: once a field option is disabled (bit cleared) + // in any segment, it remains disabled. This ensures deterministic behavior + // when options can only transition from true -> false. + fieldOptions[field] = prev & segment.fieldsOptions[field] + // check if any bits were cleared + if fieldOptions[field] != prev { + // Some bits were cleared (option changed from true -> false) + fieldsSame = false + } + } else { + // first occurrence of the field + fieldOptions[field] = segment.fieldsOptions[field] + } if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { fieldsSame = false } @@ -627,7 +687,7 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { sort.Strings(rv[1:]) // leave _id as first - return fieldsSame, rv + return fieldsSame, rv, fieldOptions } // Combine updateFieldInfo from all segments @@ -639,8 +699,10 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn if fieldInfo == nil { fieldInfo = make(map[string]*index.UpdateFieldInfo) } + // if field not present, add it if _, ok := fieldInfo[field]; !ok { fieldInfo[field] = &index.UpdateFieldInfo{ + // mark whether field is deleted in any segment Deleted: info.Deleted, Index: info.Index, Store: info.Store, @@ -650,7 +712,7 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted fieldInfo[field].Index = fieldInfo[field].Index || info.Index fieldInfo[field].Store = fieldInfo[field].Store || info.Store - fieldInfo[field].DocValues = fieldInfo[field].Store || info.DocValues + fieldInfo[field].DocValues = fieldInfo[field].DocValues || info.DocValues } } diff --git a/new.go b/new.go index c99b933d..f4515d0f 100644 --- a/new.go +++ b/new.go @@ -102,6 +102,9 @@ type interim struct { // name -> field id + 1 FieldsMap map[string]uint16 + // FieldsOptions holds the indexing options for each field + FieldsOptions map[string]index.FieldIndexingOptions + // FieldsInv is the inverse of FieldsMap // field id -> name FieldsInv []string @@ -127,6 +130,9 @@ func (s *interim) reset() (err error) { for k := range s.FieldsMap { delete(s.FieldsMap, k) } + for k := range s.FieldsOptions { + delete(s.FieldsOptions, k) + } s.FieldsInv = s.FieldsInv[:0] s.metaBuf.Reset() s.tmp0 = s.tmp0[:0] @@ -173,15 +179,23 @@ func (s *interim) convert() (uint64, uint64, error) { if s.FieldsMap == nil { s.FieldsMap = map[string]uint16{} } + if s.FieldsOptions == nil { + s.FieldsOptions = map[string]index.FieldIndexingOptions{} + } s.getOrDefineField("_id") // _id field is fieldID 0 + var fName string for _, result := range s.results { result.VisitComposite(func(field index.CompositeField) { - s.getOrDefineField(field.Name()) + fName = field.Name() + s.getOrDefineField(fName) + s.FieldsOptions[fName] = field.Options() }) result.VisitFields(func(field index.Field) { - s.getOrDefineField(field.Name()) + fName = field.Name() + s.getOrDefineField(fName) + s.FieldsOptions[fName] = field.Options() }) } @@ -192,10 +206,11 @@ func (s *interim) convert() (uint64, uint64, error) { } args := map[string]interface{}{ - "results": s.results, - "chunkMode": s.chunkMode, - "fieldsMap": s.FieldsMap, - "fieldsInv": s.FieldsInv, + "results": s.results, + "chunkMode": s.chunkMode, + "fieldsMap": s.FieldsMap, + "fieldsInv": s.FieldsInv, + "fieldsOptions": s.FieldsOptions, } if s.opaque == nil { s.opaque = map[int]resetable{} @@ -236,7 +251,7 @@ func (s *interim) convert() (uint64, uint64, error) { // we can persist a new fields section here // this new fields section will point to the various indexes available - sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque) + sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.FieldsOptions, s.w, s.opaque) if err != nil { return 0, 0, err } diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 7928a8a0..3a6e8fc0 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -105,8 +105,8 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if _, ok := sb.fieldsMap[fieldName]; !ok { continue } - // early exit if index data is supposed to be deleted - if info, ok := vo.updatedFields[fieldName]; ok && info.Index { + // early exit if field is not required to be indexed + if !vo.fieldsOptions[fieldName].IsIndexed() { continue } @@ -690,10 +690,9 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable) func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &vectorIndexOpaque{ - fieldAddrs: make(map[uint16]int), - vecIDMap: make(map[int64]*vecInfo), - vecFieldMap: make(map[uint16]*indexContent), - updatedFields: make(map[string]*index.UpdateFieldInfo), + fieldAddrs: make(map[uint16]int), + vecIDMap: make(map[int64]*vecInfo), + vecFieldMap: make(map[uint16]*indexContent), } for k, v := range args { rv.Set(k, v) @@ -732,7 +731,8 @@ type vectorIndexOpaque struct { // index to be build. vecFieldMap map[uint16]*indexContent - updatedFields map[string]*index.UpdateFieldInfo + // field indexing options + fieldsOptions map[string]index.FieldIndexingOptions tmp0 []byte } @@ -773,7 +773,8 @@ func (v *vectorIndexOpaque) Reset() (err error) { v.vecFieldMap = nil v.vecIDMap = nil v.tmp0 = v.tmp0[:0] - v.updatedFields = nil + + v.fieldsOptions = nil atomic.StoreUint64(&v.bytesWritten, 0) @@ -782,7 +783,7 @@ func (v *vectorIndexOpaque) Reset() (err error) { func (v *vectorIndexOpaque) Set(key string, val interface{}) { switch key { - case "updatedFields": - v.updatedFields = val.(map[string]*index.UpdateFieldInfo) + case "fieldsOptions": + v.fieldsOptions = val.(map[string]index.FieldIndexingOptions) } } diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 98316404..2d943f87 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -80,9 +80,8 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI } func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool, - newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, - updatedFields map[string]*index.UpdateFieldInfo, w *CountHashWriter, + fieldsInv []string, fieldsMap map[string]uint16, fieldsOptions map[string]index.FieldIndexingOptions, + fieldsSame bool, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, w *CountHashWriter, closeCh chan struct{}) (map[int]int, uint64, error) { var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 @@ -126,8 +125,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - // early exit if index data is supposed to be deleted - if info, ok := updatedFields[fieldName]; ok && info.Index { + // early exit if the field's index option is false + if !fieldsOptions[fieldName].IsIndexed() { continue } @@ -249,8 +248,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. postItr = postings.iterator(true, true, true, postItr) - // can only safely copy data if no field data has been deleted - if fieldsSame && len(updatedFields) == 0 { + // can only safely copy data if all segments have same fields + if fieldsSame { // can optimize by copying freq/norm/loc bytes directly lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying( term, postItr, newDocNums[itrI], newRoaring, @@ -323,8 +322,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - // early exit if docvalues data is supposed to be deleted - if info, ok := updatedFields[fieldName]; ok && info.DocValues { + // early exit if docvalues are not wanted for this field + if !fieldsOptions[fieldName].IncludeDocValues() { continue } fieldIDPlus1 := uint16(segment.fieldsMap[fieldName]) @@ -407,7 +406,7 @@ func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*S w *CountHashWriter, closeCh chan struct{}) error { io := i.getInvertedIndexOpaque(opaque) fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv, - io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh) + io.FieldsMap, io.FieldsOptions, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh) if err != nil { return err } @@ -953,8 +952,7 @@ func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int { func (i *invertedTextIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &invertedIndexOpaque{ - fieldAddrs: map[int]int{}, - updatedFields: map[string]*index.UpdateFieldInfo{}, + fieldAddrs: map[int]int{}, } for k, v := range args { rv.Set(k, v) @@ -981,6 +979,10 @@ type invertedIndexOpaque struct { // field id -> name FieldsInv []string + // Field indexing options + // field name -> options + FieldsOptions map[string]index.FieldIndexingOptions + // Term dictionaries for each field // field id -> term -> postings list id + 1 Dicts []map[string]uint64 @@ -1023,8 +1025,6 @@ type invertedIndexOpaque struct { fieldAddrs map[int]int - updatedFields map[string]*index.UpdateFieldInfo - fieldsSame bool numDocs uint64 } @@ -1035,6 +1035,7 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.init = false io.chunkMode = 0 io.FieldsMap = nil + io.FieldsOptions = nil io.FieldsInv = nil for i := range io.Dicts { io.Dicts[i] = nil @@ -1079,7 +1080,6 @@ func (io *invertedIndexOpaque) Reset() (err error) { io.numDocs = 0 clear(io.fieldAddrs) - clear(io.updatedFields) return err } @@ -1093,11 +1093,11 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) { i.fieldsSame = val.(bool) case "fieldsMap": i.FieldsMap = val.(map[string]uint16) + case "fieldsOptions": + i.FieldsOptions = val.(map[string]index.FieldIndexingOptions) case "fieldsInv": i.FieldsInv = val.([]string) case "numDocs": i.numDocs = val.(uint64) - case "updatedFields": - i.updatedFields = val.(map[string]*index.UpdateFieldInfo) } } diff --git a/segment.go b/segment.go index ea7e68ce..fd72a85c 100644 --- a/segment.go +++ b/segment.go @@ -54,6 +54,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { rv := &Segment{ SegmentBase: SegmentBase{ fieldsMap: make(map[string]uint16), + fieldsOptions: make(map[string]index.FieldIndexingOptions), invIndexCache: newInvertedIndexCache(), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), @@ -96,9 +97,10 @@ type SegmentBase struct { mem []byte memCRC uint32 chunkMode uint32 - fieldsMap map[string]uint16 // fieldName -> fieldID+1 - fieldsInv []string // fieldID -> fieldName - fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address + fieldsMap map[string]uint16 // fieldName -> fieldID+1 + fieldsOptions map[string]index.FieldIndexingOptions // fieldName -> fieldOptions + fieldsInv []string // fieldID -> fieldName + fieldsSectionsMap []map[uint16]uint64 // fieldID -> section -> address numDocs uint64 storedIndexOffset uint64 sectionsIndexOffset uint64 @@ -128,6 +130,11 @@ func (sb *SegmentBase) updateSize() { sizeInBytes += (len(k) + SizeOfString) + SizeOfUint16 } + // fieldsOptions + for k := range sb.fieldsOptions { + sizeInBytes += (len(k) + SizeOfString) + SizeOfUint64 + } + // fieldsInv for _, entry := range sb.fieldsInv { sizeInBytes += len(entry) + SizeOfString @@ -341,8 +348,13 @@ func (sb *SegmentBase) loadField(fieldID uint16, pos uint64, fieldName := string(sb.mem[pos : pos+fieldNameLen]) pos += fieldNameLen + // read field options + fieldOptions, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(sz) + sb.fieldsInv = append(sb.fieldsInv, fieldName) sb.fieldsMap[fieldName] = uint16(fieldID + 1) + sb.fieldsOptions[fieldName] = index.FieldIndexingOptions(fieldOptions) fieldNumSections, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) diff --git a/write.go b/write.go index 39382d05..a3c049ea 100644 --- a/write.go +++ b/write.go @@ -19,6 +19,7 @@ import ( "io" "github.com/RoaringBitmap/roaring/v2" + index "github.com/blevesearch/bleve_index_api" ) // writes out the length of the roaring bitmap in bytes as varint @@ -50,7 +51,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, return tw, nil } -func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { +func persistFieldsSection(fieldsInv []string, fieldsOptions map[string]index.FieldIndexingOptions, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { var rv uint64 fieldsOffsets := make([]uint64, 0, len(fieldsInv)) @@ -70,6 +71,13 @@ func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int return 0, err } + // write out the field options + fieldOpts := fieldsOptions[fieldName] + _, err = writeUvarints(w, uint64(fieldOpts)) + if err != nil { + return 0, err + } + // write out the number of field-specific indexes // FIXME hard-coding to 2, and not attempting to support sparseness well _, err = writeUvarints(w, uint64(len(segmentSections))) diff --git a/zap.md b/zap.md index 843a4ad0..a7d308af 100644 --- a/zap.md +++ b/zap.md @@ -95,20 +95,21 @@ Stored Fields Data is an arbitrary size record, which consists of metadata and [ Sections Index is a set of NF uint64 addresses (0 through F# - 1) each of which are offsets to the records in the Sections Info. Inside the sections info, we have further offsets to specific type of index section for that particular field in the segment file. For example, field 0 may correspond to Vector Indexing and its records would have offsets to the Vector Index Section whereas a field 1 may correspond to Text Indexing and its records would rather point to somewhere within the Inverted Text Index Section. - (...) [F] [F + F#] - + Sections Info + Sections Index + - |============================================================================|=====================================| - | | | - | +---------+---------+-----+---------+---------+~~~~~~~~+~~~~~~~~+--+...+-+ | +-------+--------+...+------+-----+ | - +----> S1 Addr | S1 Type | ... | Sn Addr | Sn Type | NS | Length | Name | | | 0 | 1 | | F#-1 | NF | | - | | +---------+---------+-----+---------+---------+~~~~~~~~+~~~~~~~~+--+...+-+ | +-------+----+---+...+------+-----+ | - | | | | | - | +============================================================================+==============|======================+ - | | - +----------------------------------------------------------------------------------------------+ + (...) [F] [F + F#] + + Sections Info + Sections Index + + |===========================================================================|=================================| + | | | + | +--------+------+---+----+---------+---------+~~~~~+--+...+--+~~~~~~~~~+ | +------+------+...+------+----+ | + +---->| Length | Name | O | NS | S1 Type | S1 Addr | ... | Sn Type | Sn Addr | | | 0 | 1 | | F#-1 | NF | | + | | +--------+------+---+----+---------+---------+~~~~~+--+...+--+~~~~~~~~~+ | +------+----+-+...+------+----+ | + | | | | | + | +===========================================================================+=============|===================+ + | | + +--------------------------------------------------------------------------------------------+ NF. Number of fields NS. Number of index sections + O. Field Indexing Options Sn. nth index section ## Inverted Text Index Section