diff --git a/builder.go b/builder.go index f170317ee..ef3e45eb1 100644 --- a/builder.go +++ b/builder.go @@ -74,7 +74,12 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int // do not use real config, as these are options for the builder, // not the resulting index meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{}) - err = meta.Save(path) + + writer, err := util.NewFileWriter([]byte(metaFilename)) + if err != nil { + return nil, err + } + err = meta.Save(path, writer) if err != nil { return nil, err } diff --git a/index.go b/index.go index a9c8ada34..25342e30d 100644 --- a/index.go +++ b/index.go @@ -388,3 +388,9 @@ type SynonymIndex interface { // IndexSynonym indexes a synonym definition, with the specified id and belonging to the specified collection. IndexSynonym(id string, collection string, definition *SynonymDefinition) error } + +type CustomizableIndex interface { + Index + KeysInUse() (map[string]struct{}, error) + DropKeys(ids map[string]struct{}) error +} diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 9abcf2db6..b6da0c572 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -89,10 +89,19 @@ OUTER: } startTime := time.Now() - + var err error // lets get started - err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, - ourSnapshot) + if ctrlMsg.plan == nil { + err = s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, + ourSnapshot) + } else { + cw := newCloseChWrapper(s.closeCh, ctrlMsg.ctx) + defer cw.close() + go cw.listen() + + err = s.executePlanMergeAtSnapshot(ctrlMsg.plan, cw) + } + if err != nil { atomic.StoreUint64(&s.iStats.mergeEpoch, 0) if err == segment.ErrClosed { @@ -161,6 +170,7 @@ OUTER: type mergerCtrl struct { ctx context.Context options *mergeplan.MergePlanOptions + plan *mergeplan.MergePlan doneCh chan struct{} } @@ -301,15 +311,18 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks))) - // process tasks in serial for now - var filenames []string - cw := newCloseChWrapper(s.closeCh, ctx) defer cw.close() go cw.listen() - for _, task := range resultMergePlan.Tasks { + return s.executePlanMergeAtSnapshot(resultMergePlan, cw) +} + +func (s *Scorch) executePlanMergeAtSnapshot(plan *mergeplan.MergePlan, cw *closeChWrapper) error { + var filenames []string + + for _, task := range plan.Tasks { if len(task.Segments) == 0 { atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1) continue diff --git a/index/scorch/persister.go b/index/scorch/persister.go index d92c3a85b..6dcd9f770 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -605,9 +605,8 @@ func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory, return err } -func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, - segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory) ( - []string, map[uint64]string, error) { +func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segPlugin SegmentPlugin, + exclude map[uint64]struct{}, d index.Directory) ([]string, map[uint64]string, error) { snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket) if err != nil { return nil, nil, err @@ -633,6 +632,15 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, err } + writer, err := util.NewFileWriter(util.BoltWriterContext) + if err != nil { + return nil, nil, err + } + + err = metaBucket.Put(util.BoltMetaDataWriterIdKey, []byte(writer.Id())) + if err != nil { + return nil, nil, err + } // Storing the timestamp at which the current indexSnapshot // was persisted, useful when you want to spread the @@ -655,7 +663,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, } // TODO optimize writing these in order? for k, v := range snapshot.internal { - err = internalBucket.Put([]byte(k), v) + buf := writer.Process(v) + err = internalBucket.Put([]byte(k), buf) if err != nil { return nil, nil, err } @@ -665,7 +674,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, val := make([]byte, 8) bytesWritten := atomic.LoadUint64(&snapshot.parent.stats.TotBytesWrittenAtIndexTime) binary.LittleEndian.PutUint64(val, bytesWritten) - err = internalBucket.Put(util.TotBytesWrittenKey, val) + buf := writer.Process(val) + err = internalBucket.Put(util.TotBytesWrittenKey, buf) if err != nil { return nil, nil, err } @@ -721,7 +731,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err) } - err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBuf.Bytes()) + roaringBytes := writer.Process(roaringBuf.Bytes()) + err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBytes) if err != nil { return nil, nil, err } @@ -733,7 +744,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, err } - err = snapshotSegmentBucket.Put(util.BoltStatsKey, b) + statsBytes := writer.Process(b) + err = snapshotSegmentBucket.Put(util.BoltStatsKey, statsBytes) if err != nil { return nil, nil, err } @@ -745,7 +757,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, err } - err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, b) + updatedFieldsBytes := writer.Process(b) + err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, updatedFieldsBytes) if err != nil { return nil, nil, err } @@ -844,6 +857,20 @@ func zapFileName(epoch uint64) string { // bolt snapshot code +var ( + boltSnapshotsBucket = []byte{'s'} + boltPathKey = []byte{'p'} + boltDeletedKey = []byte{'d'} + boltInternalKey = []byte{'i'} + boltMetaDataKey = []byte{'m'} + boltMetaDataSegmentTypeKey = []byte("type") + boltMetaDataSegmentVersionKey = []byte("version") + boltMetaDataTimeStamp = []byte("timeStamp") + boltMetaDataWriterIdKey = []byte("writerId") + boltStatsKey = []byte("stats") + TotBytesWrittenKey = []byte("TotBytesWritten") +) + func (s *Scorch) loadFromBolt() error { err := s.rootBolt.View(func(tx *bolt.Tx) error { snapshots := tx.Bucket(util.BoltSnapshotsBucket) @@ -953,6 +980,13 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return nil, fmt.Errorf( "unable to load correct segment wrapper: %v", err) } + readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + reader, err := util.NewFileReader(readerId, util.BoltWriterContext) + if err != nil { + _ = rv.DecRef() + return nil, fmt.Errorf("unable to load correct reader: %v", err) + } + var running uint64 c := snapshot.Cursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { @@ -963,7 +997,10 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return nil, fmt.Errorf("internal bucket missing") } err := internalBucket.ForEach(func(key []byte, val []byte) error { - copiedVal := append([]byte(nil), val...) + copiedVal, err := reader.Process(append([]byte(nil), val...)) + if err != nil { + return err + } rv.internal[string(key)] = copiedVal return nil }) @@ -977,7 +1014,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { _ = rv.DecRef() return nil, fmt.Errorf("segment key, but bucket missing %x", k) } - segmentSnapshot, err := s.loadSegment(segmentBucket) + segmentSnapshot, err := s.loadSegment(segmentBucket, reader) if err != nil { _ = rv.DecRef() return nil, fmt.Errorf("failed to load segment: %v", err) @@ -999,7 +1036,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return rv, nil } -func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) { +func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket, reader *util.FileReader) (*SegmentSnapshot, error) { pathBytes := segmentBucket.Get(util.BoltPathKey) if pathBytes == nil { return nil, fmt.Errorf("segment path missing") @@ -1017,6 +1054,11 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro } deletedBytes := segmentBucket.Get(util.BoltDeletedKey) if deletedBytes != nil { + deletedBytes, err = reader.Process(deletedBytes) + if err != nil { + _ = seg.Close() + return nil, err + } deletedBitmap := roaring.NewBitmap() r := bytes.NewReader(deletedBytes) _, err := deletedBitmap.ReadFrom(r) @@ -1031,7 +1073,11 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro statBytes := segmentBucket.Get(util.BoltStatsKey) if statBytes != nil { var statsMap map[string]map[string]uint64 - + statBytes, err = reader.Process(statBytes) + if err != nil { + _ = seg.Close() + return nil, err + } err := json.Unmarshal(statBytes, &statsMap) stats := &fieldStats{statMap: statsMap} if err != nil { @@ -1043,8 +1089,12 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro updatedFieldBytes := segmentBucket.Get(util.BoltUpdatedFieldsKey) if updatedFieldBytes != nil { var updatedFields map[string]*index.UpdateFieldInfo - - err := json.Unmarshal(updatedFieldBytes, &updatedFields) + updatedFieldBytes, err := reader.Process(updatedFieldBytes) + if err != nil { + _ = seg.Close() + return nil, err + } + err = json.Unmarshal(updatedFieldBytes, &updatedFields) if err != nil { _ = seg.Close() return nil, fmt.Errorf("error reading updated field bytes: %v", err) @@ -1057,6 +1107,141 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return rv, nil } +func (s *Scorch) boltKeysInUse() ([]string, error) { + keyMap := make(map[string]struct{}) + err := s.rootBolt.View(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + c := snapshots.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + snapshot := snapshots.Bucket(k) + if snapshot == nil { + continue + } + metaBucket := snapshot.Bucket(boltMetaDataKey) + if metaBucket == nil { + continue + } + keyId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + keyMap[keyId] = struct{}{} + } + return nil + }) + if err != nil { + return nil, err + } + + rv := make([]string, 0, len(keyMap)) + for k := range keyMap { + rv = append(rv, k) + } + + return rv, nil +} + +func (s *Scorch) removeBoltKeys(ids []string) error { + keyMap := make(map[string]struct{}) + for _, id := range ids { + keyMap[id] = struct{}{} + } + writer, err := util.NewFileWriter(util.BoltWriterContext) + if err != nil { + return err + } + + err = s.rootBolt.Update(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + c := snapshots.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + snapshot := snapshots.Bucket(k) + if snapshot == nil { + continue + } + metaBucket := snapshot.Bucket(boltMetaDataKey) + if metaBucket == nil { + continue + } + readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + if _, ok := keyMap[readerId]; ok { + reader, err := util.NewFileReader(readerId, util.BoltWriterContext) + if err != nil { + return fmt.Errorf("unable to load correct reader: %v", err) + } + c := snapshots.Cursor() + for kk, _ := c.First(); kk != nil; kk, _ = c.Next() { + if k[0] == boltInternalKey[0] { + internalBucket := snapshot.Bucket(kk) + if internalBucket == nil { + continue + } + // process all of the internal values and replace them with new values + err := internalBucket.ForEach(func(key []byte, val []byte) error { + buf, err := reader.Process(val) + if err != nil { + return err + } + + newBuf := writer.Process(buf) + return internalBucket.Put(key, newBuf) + }) + if err != nil { + return err + } + } else if kk[0] != boltMetaDataKey[0] { + segmentBucket := snapshot.Bucket(kk) + if segmentBucket == nil { + continue + } + // process the deleted key + deletedBytes := segmentBucket.Get(boltDeletedKey) + if deletedBytes != nil { + buf, err := reader.Process(deletedBytes) + if err != nil { + return err + } + + newBuf := writer.Process(buf) + err = segmentBucket.Put(boltDeletedKey, newBuf) + if err != nil { + return err + } + } + // process the stats key + statsBytes := segmentBucket.Get(boltStatsKey) + if statsBytes != nil { + buf, err := reader.Process(statsBytes) + if err != nil { + return err + } + + newBuf := writer.Process(buf) + err = segmentBucket.Put(boltStatsKey, newBuf) + if err != nil { + return err + } + } + } + } + err = metaBucket.Put(boltMetaDataWriterIdKey, []byte(writer.Id())) + if err != nil { + return err + } + } + } + return nil + }) + + if err != nil { + return err + } + return nil +} + func (s *Scorch) removeOldData() { removed, err := s.removeOldBoltSnapshots() if err != nil { diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 83924978e..ea0572a0b 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -15,6 +15,7 @@ package scorch import ( + "context" "encoding/json" "fmt" "os" @@ -24,6 +25,7 @@ import ( "time" "github.com/RoaringBitmap/roaring/v2" + "github.com/blevesearch/bleve/v2/index/scorch/mergeplan" "github.com/blevesearch/bleve/v2/registry" "github.com/blevesearch/bleve/v2/util" index "github.com/blevesearch/bleve_index_api" @@ -161,6 +163,7 @@ func NewScorch(storeName string, if ok { rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName] } + // validate any custom persistor options to // prevent an async error in the persistor routine _, err = rv.parsePersisterOptions() @@ -985,6 +988,27 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping continue } snapshot := snapshots.Bucket(k) + metaBucket := snapshot.Bucket(util.BoltMetaDataKey) + if metaBucket == nil { + return fmt.Errorf("meta-data bucket missing") + } + + writer, err := util.NewFileWriter(util.BoltWriterContext) + if err != nil { + return fmt.Errorf("unable to load correct writer: %v", err) + } + + readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + reader, err := util.NewFileReader(readerId, util.BoltWriterContext) + if err != nil { + return fmt.Errorf("unable to load correct reader: %v", err) + } + + err = metaBucket.Put(util.BoltMetaDataWriterIdKey, []byte(writer.Id())) + if err != nil { + return err + } + cc := snapshot.Cursor() for kk, _ := cc.First(); kk != nil; kk, _ = cc.Next() { if kk[0] == util.BoltInternalKey[0] { @@ -992,19 +1016,50 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping if internalBucket == nil { return fmt.Errorf("segment key, but bucket missing %x", kk) } - err = internalBucket.Put(util.MappingInternalKey, mappingBytes) + + internalVals := make(map[string][]byte) + err := internalBucket.ForEach(func(key []byte, val []byte) error { + copiedVal, err := reader.Process(append([]byte(nil), val...)) + if err != nil { + return err + } + internalVals[string(key)] = copiedVal + return nil + }) if err != nil { return err } + + for key, val := range internalVals { + valBytes := writer.Process(val) + if key == string(util.MappingInternalKey) { + buf := writer.Process(mappingBytes) + err = internalBucket.Put([]byte(key), buf) + if err != nil { + return err + } + } else { + err = internalBucket.Put([]byte(key), valBytes) + if err != nil { + return err + } + } + } } else if kk[0] != util.BoltMetaDataKey[0] { segmentBucket := snapshot.Bucket(kk) if segmentBucket == nil { return fmt.Errorf("segment key, but bucket missing %x", kk) } + var updatedFields map[string]*index.UpdateFieldInfo updatedFieldBytes := segmentBucket.Get(util.BoltUpdatedFieldsKey) if updatedFieldBytes != nil { - err := json.Unmarshal(updatedFieldBytes, &updatedFields) + buf, err := reader.Process(updatedFieldBytes) + if err != nil { + return err + } + + err = json.Unmarshal(buf, &updatedFields) if err != nil { return fmt.Errorf("error reading updated field bytes: %v", err) } @@ -1027,13 +1082,156 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping if err != nil { return err } - err = segmentBucket.Put(util.BoltUpdatedFieldsKey, b) + buf := writer.Process(b) + err = segmentBucket.Put(util.BoltUpdatedFieldsKey, buf) if err != nil { return err } + + deletedBytes := segmentBucket.Get(util.BoltDeletedKey) + if deletedBytes != nil { + deletedBytes, err = reader.Process(deletedBytes) + if err != nil { + return err + } + + buf := writer.Process(deletedBytes) + err = segmentBucket.Put(util.BoltDeletedKey, buf) + if err != nil { + return err + } + } + + statBytes := segmentBucket.Get(util.BoltStatsKey) + if statBytes != nil { + statBytes, err = reader.Process(statBytes) + if err != nil { + return err + } + + buf := writer.Process(statBytes) + err = segmentBucket.Put(util.BoltStatsKey, buf) + if err != nil { + return err + } + } } } } return nil }) } + +func (s *Scorch) KeysInUse() ([]string, error) { + s.rootLock.RLock() + defer s.rootLock.RUnlock() + + keyMap := make(map[string]struct{}) + for _, segmentSnapShot := range s.root.segment { + if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { + keyMap[seg.CallbackId()] = struct{}{} + } + } + + boltKeys, err := s.boltKeysInUse() + if err != nil { + return nil, err + } + + for _, k := range boltKeys { + keyMap[k] = struct{}{} + } + + rv := make([]string, 0, len(keyMap)) + for k := range keyMap { + rv = append(rv, k) + } + + return rv, nil +} + +func (s *Scorch) DropKeys(ids []string) error { + + keyMap := make(map[string]struct{}) + for _, k := range ids { + keyMap[k] = struct{}{} + } + + err := s.removeBoltKeys(ids) + if err != nil { + return err + } + + s.rootLock.Lock() + defer s.rootLock.Unlock() + + segsToCompact := make([]mergeplan.Segment, 0) + for _, segmentSnapShot := range s.root.segment { + if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { + if _, ok := keyMap[seg.CallbackId()]; ok { + segsToCompact = append(segsToCompact, segmentSnapShot) + } + } + } + + if len(segsToCompact) > 0 { + return s.forceMergeSegs(segsToCompact) + } + + return nil +} + +// Force merge all given segments regardless of their elibility for compaction +// Large segments will be rewritten instead of merging +func (s *Scorch) forceMergeSegs(segsToCompact []mergeplan.Segment) error { + // Create a merge plan with the filtered segments and force a merge + // to remove the callback from the segments. + mergePlannerOptions, err := s.parseMergePlannerOptions() + if err != nil { + return fmt.Errorf("mergePlannerOption json parsing err: %v", err) + + } + + atomic.AddUint64(&s.stats.TotFileMergePlan, 1) + + mergePlan, err := mergeplan.Plan(segsToCompact, mergePlannerOptions) + if err != nil { + atomic.AddUint64(&s.stats.TotFileMergePlanErr, 1) + return fmt.Errorf("merge plan creation err: %v", err) + } + + segDictionary := make(map[uint64]bool) + for _, seg := range segsToCompact { + segDictionary[seg.Id()] = true + } + + if mergePlan == nil { + mergePlan = &mergeplan.MergePlan{ + Tasks: make([]*mergeplan.MergeTask, 0), + } + } + + for _, task := range mergePlan.Tasks { + for _, seg := range task.Segments { + segDictionary[seg.Id()] = false + } + } + + for _, seg := range segsToCompact { + if segDictionary[seg.Id()] { + mergePlan.Tasks = append(mergePlan.Tasks, &mergeplan.MergeTask{ + Segments: []mergeplan.Segment{seg}, + }) + } + } + + atomic.AddUint64(&s.stats.TotFileMergePlanOk, 1) + atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(mergePlan.Tasks))) + + s.forceMergeRequestCh <- &mergerCtrl{ + plan: mergePlan, + ctx: context.Background(), + } + + return nil +} diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index c09a7db40..07f5de2dc 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -91,6 +91,8 @@ type IndexSnapshot struct { // UpdateFieldInfo.Index or .Store or .DocValues). // Used to short circuit queries trying to read stale data updatedFields map[string]*index.UpdateFieldInfo + + writerId string } func (i *IndexSnapshot) Segments() []*SegmentSnapshot { diff --git a/index_impl.go b/index_impl.go index a43b3cf75..f145cb121 100644 --- a/index_impl.go +++ b/index_impl.go @@ -45,14 +45,16 @@ import ( ) type indexImpl struct { - path string - name string - meta *indexMeta - i index.Index - m mapping.IndexMapping - mutex sync.RWMutex - open bool - stats *IndexStat + path string + name string + meta *indexMeta + i index.Index + m mapping.IndexMapping + mutex sync.RWMutex + open bool + stats *IndexStat + writer *util.FileWriter + reader *util.FileReader } const storePath = "store" @@ -88,16 +90,28 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, return nil, fmt.Errorf("bleve not configured for file based indexing") } + fileWriter, err := util.NewFileWriter([]byte(metaFilename)) + if err != nil { + return nil, err + } + fileReader, err := util.NewFileReader(fileWriter.Id(), []byte(metaFilename)) + if err != nil { + return nil, err + } + rv := indexImpl{ - path: path, - name: path, - m: mapping, - meta: newIndexMeta(indexType, kvstore, kvconfig), + path: path, + name: path, + m: mapping, + meta: newIndexMeta(indexType, kvstore, kvconfig), + writer: fileWriter, + reader: fileReader, } rv.stats = &IndexStat{i: &rv} // at this point there is hope that we can be successful, so save index meta if path != "" { - err = rv.meta.Save(path) + kvconfig["callback_id"] = rv.writer.Id() + err = rv.meta.Save(path, rv.writer) if err != nil { return nil, err } @@ -153,7 +167,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde } rv.stats = &IndexStat{i: rv} - rv.meta, err = openIndexMeta(path) + rv.meta, rv.reader, err = openIndexMeta(path) if err != nil { return nil, err } @@ -480,6 +494,44 @@ func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) { return i.SearchInContext(context.Background(), req) } +func (i *indexImpl) KeysInUse() (map[string]struct{}, error) { + + keys := map[string]struct{}{} + keys[i.reader.Id()] = struct{}{} + + if cidx, ok := i.i.(index.CustomizableIndex); ok { + cKeys, err := cidx.KeysInUse() + if err != nil { + return nil, err + } + for k := range cKeys { + keys[k] = struct{}{} + } + } + + return keys, nil +} + +func (i *indexImpl) DropKeys(keys map[string]struct{}) error { + + if _, ok := keys[i.reader.Id()]; ok { + err := i.meta.UpdateWriter(i.path) + if err != nil { + return err + } + } + + if cidx, ok := i.i.(index.CustomizableIndex); ok { + return cidx.DropKeys(keys) + } else { + if _, ok := keys[""]; ok { + return fmt.Errorf("underlying index does not support DropKeys") + } + } + + return nil +} + var ( documentMatchEmptySize int searchContextEmptySize int @@ -745,7 +797,6 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr if !contextScoreFusionKeyExists { setKnnHitsInCollector(knnHits, req, coll) } - if fts != nil { if is, ok := indexReader.(*scorch.IndexSnapshot); ok { diff --git a/index_meta.go b/index_meta.go index 14b88dcbc..2ec068871 100644 --- a/index_meta.go +++ b/index_meta.go @@ -15,6 +15,7 @@ package bleve import ( + "encoding/binary" "fmt" "os" "path/filepath" @@ -40,27 +41,54 @@ func newIndexMeta(indexType string, storage string, config map[string]interface{ } } -func openIndexMeta(path string) (*indexMeta, error) { +func openIndexMeta(path string) (*indexMeta, *util.FileReader, error) { if _, err := os.Stat(path); os.IsNotExist(err) { - return nil, ErrorIndexPathDoesNotExist + return nil, nil, ErrorIndexPathDoesNotExist } indexMetaPath := indexMetaPath(path) metaBytes, err := os.ReadFile(indexMetaPath) if err != nil { - return nil, ErrorIndexMetaMissing + return nil, nil, ErrorIndexMetaMissing } + var im indexMeta + fileReader := &util.FileReader{} err = util.UnmarshalJSON(metaBytes, &im) if err != nil { - return nil, ErrorIndexMetaCorrupt + if len(metaBytes) < 4 { + return nil, nil, ErrorIndexMetaCorrupt + } + + pos := len(metaBytes) - 4 + writerIdLen := int(binary.BigEndian.Uint32(metaBytes[pos:])) + pos -= writerIdLen + if pos < 0 { + return nil, nil, ErrorIndexMetaCorrupt + } + + writerId := metaBytes[pos : pos+writerIdLen] + fileReader, err = util.NewFileReader(string(writerId), []byte(metaFilename)) + if err != nil { + return nil, nil, err + } + + buf, err := fileReader.Process(metaBytes[0:pos]) + if err != nil { + return nil, nil, err + } + err = util.UnmarshalJSON(buf, &im) + if err != nil { + return nil, nil, ErrorIndexMetaCorrupt + } } + if im.IndexType == "" { im.IndexType = upsidedown.Name } - return &im, nil + return &im, fileReader, nil } -func (i *indexMeta) Save(path string) (err error) { +func (i *indexMeta) Save(path string, writer *util.FileWriter) (err error) { indexMetaPath := indexMetaPath(path) // ensure any necessary parent directories exist err = os.MkdirAll(path, 0700) @@ -86,10 +114,24 @@ func (i *indexMeta) Save(path string) (err error) { err = ierr } }() + + metaBytes = writer.Process(metaBytes) + _, err = indexMetaFile.Write(metaBytes) if err != nil { return err } + + _, err = indexMetaFile.Write([]byte(writer.Id())) + if err != nil { + return err + } + + err = binary.Write(indexMetaFile, binary.BigEndian, uint32(len(writer.Id()))) + if err != nil { + return err + } + return nil } @@ -110,6 +152,52 @@ func (i *indexMeta) CopyTo(d index.Directory) (err error) { return err } +func (i *indexMeta) UpdateWriter(path string) error { + indexMetaPath := indexMetaPath(path) + metaBytes, err := os.ReadFile(indexMetaPath) + if err != nil { + return ErrorIndexMetaMissing + } + + if len(metaBytes) < 4 { + return ErrorIndexMetaCorrupt + } + + pos := len(metaBytes) - 4 + writerIdLen := int(binary.BigEndian.Uint32(metaBytes[pos:])) + pos -= writerIdLen + if pos < 0 { + return ErrorIndexMetaCorrupt + } + + writerId := metaBytes[pos : pos+writerIdLen] + fileReader, err := util.NewFileReader(string(writerId), []byte(metaFilename)) + if err != nil { + return err + } + + metaBytes, err = fileReader.Process(metaBytes[0:pos]) + if err != nil { + return err + } + + writer, err := util.NewFileWriter([]byte(metaFilename)) + if err != nil { + return err + } + + metaBytes = writer.Process(metaBytes) + + metaBytes = append(metaBytes, []byte(writer.Id())...) + binary.BigEndian.PutUint32(metaBytes, uint32(len(writer.Id()))) + err = os.WriteFile(indexMetaPath, metaBytes, 0666) + if err != nil { + return err + } + + return nil +} + func indexMetaPath(path string) string { return filepath.Join(path, metaFilename) } diff --git a/index_meta_test.go b/index_meta_test.go index 7719f577b..4f8f3f3cb 100644 --- a/index_meta_test.go +++ b/index_meta_test.go @@ -17,6 +17,8 @@ package bleve import ( "os" "testing" + + "github.com/blevesearch/bleve/v2/util" ) func TestIndexMeta(t *testing.T) { @@ -29,21 +31,25 @@ func TestIndexMeta(t *testing.T) { }() // open non-existent meta should give an error - _, err := openIndexMeta(testIndexPath) + _, _, err := openIndexMeta(testIndexPath) if err == nil { t.Errorf("expected error, got nil") } + writer, err := util.NewFileWriter([]byte(metaFilename)) + if err != nil { + t.Fatal(err) + } // create meta im := &indexMeta{Storage: "boltdb"} - err = im.Save(testIndexPath) + err = im.Save(testIndexPath, writer) if err != nil { t.Error(err) } im = nil // open a meta that exists - im, err = openIndexMeta(testIndexPath) + im, _, err = openIndexMeta(testIndexPath) if err != nil { t.Error(err) } @@ -52,7 +58,7 @@ func TestIndexMeta(t *testing.T) { } // save a meta that already exists - err = im.Save(testIndexPath) + err = im.Save(testIndexPath, writer) if err == nil { t.Errorf("expected error, got nil") } diff --git a/search/searcher/base_test.go b/search/searcher/base_test.go index 6f80bf653..5b84007a5 100644 --- a/search/searcher/base_test.go +++ b/search/searcher/base_test.go @@ -67,8 +67,16 @@ func initTwoDocs(twoDocIndex index.Index) { panic(err) } batch := index.NewBatch() + + // Make a copy of the documents to prevent modification of the + // original slice across multiple tests + docs := []document.Document{} for _, doc := range twoDocIndexDocs { - batch.Update(doc) + docs = append(docs, *doc) + } + + for _, doc := range docs { + batch.Update(&doc) } err = twoDocIndex.Batch(batch) if err != nil { diff --git a/util/callbacks.go b/util/callbacks.go new file mode 100644 index 000000000..44ad4638e --- /dev/null +++ b/util/callbacks.go @@ -0,0 +1,170 @@ +package util + +var WriterHook func(context []byte) (string, func(data []byte) []byte, error) + +var ReaderHook func(id string, context []byte) (func(data []byte) ([]byte, error), error) + +func init() { + // Variables used for development and testing purposes + // encryptionKey := make([]byte, 32) + // if _, err := rand.Read(encryptionKey); err != nil { + // panic("failed to generate AES key: " + err.Error()) + // } + + // key := make([]byte, 32) + // keyId := "test-key-id" + // label := []byte("search") + + // if _, err := rand.Read(key); err != nil { + // panic("Failed to generate random key: " + err.Error()) + // } + + // WriterHook = func(context []byte) (string, func(data []byte) []byte, error) { + + // derivedKey := make([]byte, 32) + // derivedKey, err := crypto.OpenSSLKBKDFDeriveKey(key, label, context, derivedKey, "SHA2-256", "") + // if err != nil { + // return "", nil, err + // } + + // block, err := aes.NewCipher(derivedKey) + // if err != nil { + // panic("Failed to create AES cipher: " + err.Error()) + // } + + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // panic("Failed to create AES GCM: " + err.Error()) + // } + + // nonce := make([]byte, 12) + // if _, err := rand.Read(nonce); err != nil { + // panic("Failed to generate random nonce: " + err.Error()) + // } + + // writerCallback := func(data []byte) []byte { + // ciphertext := aesgcm.Seal(nil, nonce, data, nil) + // result := append(ciphertext, nonce...) + + // for i := len(nonce) - 1; i >= 0; i-- { + // if nonce[i] < 255 { + // nonce[i]++ + // break + // } + // nonce[i] = 0 + // } + // return result + // } + + // return keyId, writerCallback, nil + // } + + // ReaderHook = func(id string, context []byte) (func(data []byte) ([]byte, error), error) { + // if id != keyId { + // return nil, fmt.Errorf("unknown callback ID: %s", id) + // } + + // derivedKey := make([]byte, 32) + // derivedKey, err := crypto.OpenSSLKBKDFDeriveKey(key, label, context, derivedKey, "SHA2-256", "") + // if err != nil { + // return nil, err + // } + + // block, err := aes.NewCipher(derivedKey) + // if err != nil { + // panic("Failed to create AES cipher: " + err.Error()) + // } + + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // panic("Failed to create AES GCM: " + err.Error()) + // } + + // readerCallback := func(data []byte) ([]byte, error) { + + // if len(data) < 12 { + // return nil, fmt.Errorf("ciphertext too short") + // } + + // nonce := data[len(data)-12:] + // ciphertext := data[:len(data)-12] + // plaintext, err := aesgcm.Open(nil, nonce, ciphertext, nil) + // if err != nil { + // return nil, fmt.Errorf("failed to decrypt data: %w", err) + // } + + // return plaintext, nil + // } + + // return readerCallback, nil + // } + + // zapv16.WriterHook = WriterHook + // zapv16.ReaderHook = ReaderHook +} + +type FileWriter struct { + processor func(data []byte) []byte + context []byte + id string +} + +func NewFileWriter(context []byte) (*FileWriter, error) { + rv := &FileWriter{ + context: context, + } + + if WriterHook != nil { + var err error + rv.id, rv.processor, err = WriterHook(rv.context) + if err != nil { + return nil, err + } + } + + return rv, nil +} + +func (w *FileWriter) Process(data []byte) []byte { + if w.processor != nil { + return w.processor(data) + } + return data +} + +func (w *FileWriter) Id() string { + return w.id +} + +type FileReader struct { + processor func(data []byte) ([]byte, error) + id string + context []byte +} + +func NewFileReader(id string, context []byte) (*FileReader, error) { + rv := &FileReader{ + id: id, + } + + if ReaderHook != nil { + var err error + rv.processor, err = ReaderHook(id, context) + if err != nil { + return nil, err + } + } + + return rv, nil +} + +func (r *FileReader) Process(data []byte) ([]byte, error) { + if r.processor != nil { + return r.processor(data) + } + return data, nil +} + +func (r *FileReader) Id() string { + return r.id +} diff --git a/util/keys.go b/util/keys.go index b71a7f48b..cd994c970 100644 --- a/util/keys.go +++ b/util/keys.go @@ -27,6 +27,9 @@ var ( BoltStatsKey = []byte("stats") BoltUpdatedFieldsKey = []byte("fields") TotBytesWrittenKey = []byte("TotBytesWritten") + BoltMetaDataWriterIdKey = []byte("writerId") MappingInternalKey = []byte("_mapping") + + BoltWriterContext = []byte("bolt") )