Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
vecIndexCache: newVectorIndexCache(),
synIndexCache: newSynonymIndexCache(),
// following fields gets populated by loadFields
fieldsMap: make(map[string]uint16),
fieldsInv: make([]string, 0),
fieldsMap: make(map[string]uint16),
fieldsOptions: make(map[string]index.FieldIndexingOptions),
fieldsInv: make([]string, 0),
}
sb.updateSize()

Expand Down
90 changes: 76 additions & 14 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,55 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf
return fieldsInv[:idx]
}

// Update field options using updateFieldInfo to override the options
// selected during mergeFields, if needed. This includes removing field
// options for deleted fields and updating options for fields with changes
// that have not yet been propagated because a new segment has not been created.
func finalizeFieldOptions(fieldOptions map[string]index.FieldIndexingOptions,
updatedFields map[string]*index.UpdateFieldInfo) map[string]index.FieldIndexingOptions {
for field, opts := range fieldOptions {
if info, ok := updatedFields[field]; ok {
// if field is deleted, remove its options
if info.Deleted {
delete(fieldOptions, field)
continue
}
// otherwise, update options based on info
if info.Index {
// ensure indexing is disabled
opts &^= index.IndexField
}
if info.Store {
// ensure storing is disabled
opts &^= index.StoreField
}
if info.DocValues {
// ensure doc values is disabled
opts &^= index.DocValues
}
fieldOptions[field] = opts
}
}
return fieldOptions
}

func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64,
err error) {

var fieldsSame bool
fieldsSame, fieldsInv = mergeFields(segments)
var fieldsOptions map[string]index.FieldIndexingOptions
fieldsSame, fieldsInv, fieldsOptions = mergeFields(segments)
updatedFields := mergeUpdatedFields(segments)
fieldsInv = filterFields(fieldsInv, updatedFields)
fieldsMap = mapFields(fieldsInv)
fieldsOptions = finalizeFieldOptions(fieldsOptions, updatedFields)
// fieldsSame cannot be true if fields were deleted
if len(updatedFields) > 0 {
fieldsSame = false
}

numDocs = computeNewDocCount(segments, drops)

Expand All @@ -148,12 +186,12 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
"fieldsSame": fieldsSame,
"fieldsMap": fieldsMap,
"numDocs": numDocs,
"updatedFields": updatedFields,
"fieldsOptions": fieldsOptions,
}

if numDocs > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields)
fieldsMap, fieldsInv, fieldsOptions, fieldsSame, numDocs, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, 0, err
}
Expand All @@ -171,7 +209,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,

// we can persist the fields section index now, this will point
// to the various indexes (each in different section) available for a field.
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque)
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, fieldsOptions, cr, mergeOpaque)
if err != nil {
return nil, 0, 0, nil, nil, 0, err
}
Expand Down Expand Up @@ -372,8 +410,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo
type varintEncoder func(uint64) (int, error)

func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64,
w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) {
fieldsMap map[string]uint16, fieldsInv []string,
fieldsOptions map[string]index.FieldIndexingOptions,
fieldsSame bool, newSegDocCount uint64,
w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) {
var rv [][]uint64 // The remapped or newDocNums for each segment.

var newDocNum uint64
Expand Down Expand Up @@ -413,7 +453,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
// segments and there are no deletions, via byte-copying
// of stored docs bytes directly to the writer
// cannot copy directly if fields might have been deleted
if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 {
if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) {
err := segment.copyStoredDocs(newDocNum, docNumOffsets, w)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -456,8 +496,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
// no entry for field in fieldsMap
return false
}
// early exit if the stored portion of the field is deleted
if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store {
// early exit if the store is not wanted for this field
if !fieldsOptions[field].IsStored() {
return true
}
vals[fieldID] = append(vals[fieldID], value)
Expand Down Expand Up @@ -491,8 +531,12 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,

// now walk the non-"_id" fields in order
for fieldID := 1; fieldID < len(fieldsInv); fieldID++ {
// early exit if the stored portion of the field is deleted
if val, ok := updatedFields[fieldsInv[fieldID]]; ok && val.Store {
// early exit if the store is not wanted for this field
if !fieldsOptions[fieldsInv[fieldID]].IsStored() {
continue
}
// early exit if no stored values for this field
if len(vals[fieldID]) == 0 {
continue
}
storedFieldValues := vals[fieldID]
Expand Down Expand Up @@ -597,7 +641,7 @@ func (sb *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint6
// input segments, and computes whether the fields are the same across
// segments (which depends on fields to be sorted in the same way
// across segments)
func mergeFields(segments []*SegmentBase) (bool, []string) {
func mergeFields(segments []*SegmentBase) (bool, []string, map[string]index.FieldIndexingOptions) {
fieldsSame := true

var segment0Fields []string
Expand All @@ -606,10 +650,26 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {
}

fieldsExist := map[string]struct{}{}
fieldOptions := map[string]index.FieldIndexingOptions{}
for _, segment := range segments {
fields := segment.Fields()
for fieldi, field := range fields {
fieldsExist[field] = struct{}{}

if prev, ok := fieldOptions[field]; ok {
// Merge options conservatively: once a field option is disabled (bit cleared)
// in any segment, it remains disabled. This ensures deterministic behavior
// when options can only transition from true -> false.
fieldOptions[field] = prev & segment.fieldsOptions[field]
// check if any bits were cleared
if fieldOptions[field] != prev {
// Some bits were cleared (option changed from true -> false)
fieldsSame = false
}
} else {
// first occurrence of the field
fieldOptions[field] = segment.fieldsOptions[field]
}
if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field {
fieldsSame = false
}
Expand All @@ -627,7 +687,7 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {

sort.Strings(rv[1:]) // leave _id as first

return fieldsSame, rv
return fieldsSame, rv, fieldOptions
}

// Combine updateFieldInfo from all segments
Expand All @@ -639,8 +699,10 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn
if fieldInfo == nil {
fieldInfo = make(map[string]*index.UpdateFieldInfo)
}
// if field not present, add it
if _, ok := fieldInfo[field]; !ok {
fieldInfo[field] = &index.UpdateFieldInfo{
// mark whether field is deleted in any segment
Deleted: info.Deleted,
Index: info.Index,
Store: info.Store,
Expand All @@ -650,7 +712,7 @@ func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldIn
fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted
fieldInfo[field].Index = fieldInfo[field].Index || info.Index
fieldInfo[field].Store = fieldInfo[field].Store || info.Store
fieldInfo[field].DocValues = fieldInfo[field].Store || info.DocValues
fieldInfo[field].DocValues = fieldInfo[field].DocValues || info.DocValues
}
}

Expand Down
29 changes: 22 additions & 7 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type interim struct {
// name -> field id + 1
FieldsMap map[string]uint16

// FieldsOptions holds the indexing options for each field
FieldsOptions map[string]index.FieldIndexingOptions

// FieldsInv is the inverse of FieldsMap
// field id -> name
FieldsInv []string
Expand All @@ -127,6 +130,9 @@ func (s *interim) reset() (err error) {
for k := range s.FieldsMap {
delete(s.FieldsMap, k)
}
for k := range s.FieldsOptions {
delete(s.FieldsOptions, k)
}
s.FieldsInv = s.FieldsInv[:0]
s.metaBuf.Reset()
s.tmp0 = s.tmp0[:0]
Expand Down Expand Up @@ -173,15 +179,23 @@ func (s *interim) convert() (uint64, uint64, error) {
if s.FieldsMap == nil {
s.FieldsMap = map[string]uint16{}
}
if s.FieldsOptions == nil {
s.FieldsOptions = map[string]index.FieldIndexingOptions{}
}

s.getOrDefineField("_id") // _id field is fieldID 0

var fName string
for _, result := range s.results {
result.VisitComposite(func(field index.CompositeField) {
s.getOrDefineField(field.Name())
fName = field.Name()
s.getOrDefineField(fName)
s.FieldsOptions[fName] = field.Options()
})
result.VisitFields(func(field index.Field) {
s.getOrDefineField(field.Name())
fName = field.Name()
s.getOrDefineField(fName)
s.FieldsOptions[fName] = field.Options()
})
}

Expand All @@ -192,10 +206,11 @@ func (s *interim) convert() (uint64, uint64, error) {
}

args := map[string]interface{}{
"results": s.results,
"chunkMode": s.chunkMode,
"fieldsMap": s.FieldsMap,
"fieldsInv": s.FieldsInv,
"results": s.results,
"chunkMode": s.chunkMode,
"fieldsMap": s.FieldsMap,
"fieldsInv": s.FieldsInv,
"fieldsOptions": s.FieldsOptions,
}
if s.opaque == nil {
s.opaque = map[int]resetable{}
Expand Down Expand Up @@ -236,7 +251,7 @@ func (s *interim) convert() (uint64, uint64, error) {

// we can persist a new fields section here
// this new fields section will point to the various indexes available
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque)
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.FieldsOptions, s.w, s.opaque)
if err != nil {
return 0, 0, err
}
Expand Down
21 changes: 11 additions & 10 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
if _, ok := sb.fieldsMap[fieldName]; !ok {
continue
}
// early exit if index data is supposed to be deleted
if info, ok := vo.updatedFields[fieldName]; ok && info.Index {
// early exit if field is not required to be indexed
if !vo.fieldsOptions[fieldName].IsIndexed() {
continue
}

Expand Down Expand Up @@ -690,10 +690,9 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable)

func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable {
rv := &vectorIndexOpaque{
fieldAddrs: make(map[uint16]int),
vecIDMap: make(map[int64]*vecInfo),
vecFieldMap: make(map[uint16]*indexContent),
updatedFields: make(map[string]*index.UpdateFieldInfo),
fieldAddrs: make(map[uint16]int),
vecIDMap: make(map[int64]*vecInfo),
vecFieldMap: make(map[uint16]*indexContent),
}
for k, v := range args {
rv.Set(k, v)
Expand Down Expand Up @@ -732,7 +731,8 @@ type vectorIndexOpaque struct {
// index to be build.
vecFieldMap map[uint16]*indexContent

updatedFields map[string]*index.UpdateFieldInfo
// field indexing options
fieldsOptions map[string]index.FieldIndexingOptions

tmp0 []byte
}
Expand Down Expand Up @@ -773,7 +773,8 @@ func (v *vectorIndexOpaque) Reset() (err error) {
v.vecFieldMap = nil
v.vecIDMap = nil
v.tmp0 = v.tmp0[:0]
v.updatedFields = nil

v.fieldsOptions = nil

atomic.StoreUint64(&v.bytesWritten, 0)

Expand All @@ -782,7 +783,7 @@ func (v *vectorIndexOpaque) Reset() (err error) {

func (v *vectorIndexOpaque) Set(key string, val interface{}) {
switch key {
case "updatedFields":
v.updatedFields = val.(map[string]*index.UpdateFieldInfo)
case "fieldsOptions":
v.fieldsOptions = val.(map[string]index.FieldIndexingOptions)
}
}
Loading