From 402b8d89c56b3bfb8cc1a88d4a2c41fd30982cf5 Mon Sep 17 00:00:00 2001 From: Likith B Date: Mon, 21 Jul 2025 12:17:57 +0530 Subject: [PATCH 01/11] MB-65860: Added support for fileCallbacks --- callbacks.go | 131 ++++++ index/scorch/merge.go | 26 +- index/scorch/scorch.go | 116 ++++++ index_test.go | 769 +++++++++++++++++++++-------------- search/searcher/base_test.go | 10 +- 5 files changed, 729 insertions(+), 323 deletions(-) create mode 100644 callbacks.go diff --git a/callbacks.go b/callbacks.go new file mode 100644 index 000000000..8960e19b3 --- /dev/null +++ b/callbacks.go @@ -0,0 +1,131 @@ +package bleve + +import ( + zapv16 "github.com/blevesearch/zapx/v16" +) + +// Variables used for development and testing purposes +// var keys = map[string][]byte{} +// var cbLock = sync.RWMutex{} +// var latestCallbackId string + +var WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { + return "", func(data, counter []byte) ([]byte, error) { + return data, nil + }, nil +} + +var ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { + return func(data []byte) ([]byte, error) { + return data, nil + }, nil +} + +var CounterGetter = func() ([]byte, error) { + return nil, nil +} + +func init() { + // Variables used for development and testing purposes + // encryptionKey := make([]byte, 32) + // if _, err := rand.Read(encryptionKey); err != nil { + // panic("failed to generate AES key: " + err.Error()) + // } + + // latestCallbackId = "exampleCallback" + // keys[latestCallbackId] = encryptionKey + + // latestCallbackId = "exampleCallback" + + // WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { + // cbLock.RLock() + // if latestCallbackId == "" { + // return "", func(data []byte, _ []byte) ([]byte, error) { + // return data, nil + // }, nil + // } + // keyCopy := make([]byte, 32) + // keyIdCopy := latestCallbackId + // if key, exists := keys[latestCallbackId]; exists { + // copy(keyCopy, key) + // } + // cbLock.RUnlock() + + // block, err := aes.NewCipher(keyCopy) + // if err != nil { + // return "", nil, err + // } + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // return "", nil, err + // } + + // return keyIdCopy, func(data, counter []byte) ([]byte, error) { + // ciphertext := aesgcm.Seal(nil, counter, data, nil) + // result := append(ciphertext, counter...) + // return result, nil + // }, nil + // } + + // ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { + // cbLock.RLock() + // keyCopy := make([]byte, 32) + // if key, exists := keys[cbId]; exists { + // copy(keyCopy, key) + // } + // cbLock.RUnlock() + + // if len(keyCopy) == 0 { + // return func(data []byte) ([]byte, error) { + // return data, nil + // }, nil + // } else { + // block, err := aes.NewCipher(keyCopy) + // if err != nil { + // return nil, err + // } + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // return nil, err + // } + + // return func(data []byte) ([]byte, error) { + // if len(data) < 12 { + // return nil, fmt.Errorf("ciphertext too short") + // } + + // nonce := data[len(data)-12:] + // ciphertext := data[:len(data)-12] + + // plaintext, err := aesgcm.Open(nil, nonce, ciphertext, nil) + // if err != nil { + // return nil, fmt.Errorf("decryption failed: %w", err) + // } + + // return plaintext, nil + // }, nil + // } + // } + + // CounterGetter = func() ([]byte, error) { + // nonce := make([]byte, 12) // GCM standard + // if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + // return nil, err + // } + // return nonce, nil + // } + + zapv16.WriterCallbackGetter = WriterCallbackGetter + zapv16.ReaderCallbackGetter = ReaderCallbackGetter + zapv16.CounterGetter = CounterGetter +} + +// Function used for development and testing purposes +// func SetNewCallback(callbackId string, key []byte) { +// if callbackId != "" { +// cbLock.Lock() +// keys[callbackId] = key +// latestCallbackId = callbackId +// cbLock.Unlock() +// } +// } diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 9abcf2db6..e49842107 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -89,10 +89,18 @@ OUTER: } startTime := time.Now() - + var err error // lets get started - err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, - ourSnapshot) + if ctrlMsg.plan == nil { + err = s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, + ourSnapshot) + } else { + cw := newCloseChWrapper(s.closeCh, ctrlMsg.ctx) + defer cw.close() + go cw.listen() + + err = s.executePlanMergeAtSnapshot(ctrlMsg.plan, cw) + } if err != nil { atomic.StoreUint64(&s.iStats.mergeEpoch, 0) if err == segment.ErrClosed { @@ -161,6 +169,7 @@ OUTER: type mergerCtrl struct { ctx context.Context options *mergeplan.MergePlanOptions + plan *mergeplan.MergePlan doneCh chan struct{} } @@ -301,15 +310,18 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks))) - // process tasks in serial for now - var filenames []string - cw := newCloseChWrapper(s.closeCh, ctx) defer cw.close() go cw.listen() - for _, task := range resultMergePlan.Tasks { + return s.executePlanMergeAtSnapshot(resultMergePlan, cw) +} + +func (s *Scorch) executePlanMergeAtSnapshot(plan *mergeplan.MergePlan, cw *closeChWrapper) error { + var filenames []string + + for _, task := range plan.Tasks { if len(task.Segments) == 0 { atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1) continue diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 83924978e..2e26e39df 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -15,6 +15,7 @@ package scorch import ( + "context" "encoding/json" "fmt" "os" @@ -24,6 +25,7 @@ import ( "time" "github.com/RoaringBitmap/roaring/v2" + "github.com/blevesearch/bleve/v2/index/scorch/mergeplan" "github.com/blevesearch/bleve/v2/registry" "github.com/blevesearch/bleve/v2/util" index "github.com/blevesearch/bleve_index_api" @@ -1037,3 +1039,117 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping return nil }) } + +// Used when a callback expires or is removed +func (s *Scorch) RemoveCallback(cbId string) error { + s.rootLock.Lock() + defer s.rootLock.Unlock() + + segsToCompact := make([]mergeplan.Segment, 0) + for _, segmentSnapShot := range s.root.segment { + if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { + if seg.CallbackId() == cbId { + segsToCompact = append(segsToCompact, segmentSnapShot) + } + } + } + + if len(segsToCompact) > 0 { + return s.forceMergeSegs(segsToCompact) + } + + return nil +} + +// Used when all callbacks need to be removed +func (s *Scorch) RemoveAllCallbacks() error { + s.rootLock.Lock() + defer s.rootLock.Unlock() + + segsToCompact := make([]mergeplan.Segment, 0) + for _, segmentSnapShot := range s.root.segment { + if _, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { + segsToCompact = append(segsToCompact, segmentSnapShot) + } + } + + if len(segsToCompact) > 0 { + return s.forceMergeSegs(segsToCompact) + } + + return nil +} + +// Force merge all given segments regardless of their elibility for compaction +// Large segments will be rewritten instead of merging +func (s *Scorch) forceMergeSegs(segsToCompact []mergeplan.Segment) error { + // Create a merge plan with the filtered segments and force a merge + // to remove the callback from the segments. + mergePlannerOptions, err := s.parseMergePlannerOptions() + if err != nil { + return fmt.Errorf("mergePlannerOption json parsing err: %v", err) + + } + + atomic.AddUint64(&s.stats.TotFileMergePlan, 1) + + mergePlan, err := mergeplan.Plan(segsToCompact, mergePlannerOptions) + if err != nil { + atomic.AddUint64(&s.stats.TotFileMergePlanErr, 1) + return fmt.Errorf("merge plan creation err: %v", err) + } + + segDictionary := make(map[uint64]bool) + for _, seg := range segsToCompact { + segDictionary[seg.Id()] = true + } + + if mergePlan == nil { + mergePlan = &mergeplan.MergePlan{ + Tasks: make([]*mergeplan.MergeTask, 0), + } + } + + for _, task := range mergePlan.Tasks { + for _, seg := range task.Segments { + segDictionary[seg.Id()] = false + } + } + + for _, seg := range segsToCompact { + if segDictionary[seg.Id()] { + mergePlan.Tasks = append(mergePlan.Tasks, &mergeplan.MergeTask{ + Segments: []mergeplan.Segment{seg}, + }) + } + } + + atomic.AddUint64(&s.stats.TotFileMergePlanOk, 1) + atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(mergePlan.Tasks))) + + s.forceMergeRequestCh <- &mergerCtrl{ + plan: mergePlan, + ctx: context.Background(), + } + + return nil +} + +// CBIDsInUse returns a map of all the callback IDs that are currently in use +func (s *Scorch) CBIDsInUse() map[string]struct{} { + s.rootLock.RLock() + defer s.rootLock.RUnlock() + + rv := make(map[string]struct{}) + if s.root == nil { + return rv + } + + for _, segmentSnapShot := range s.root.segment { + if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { + rv[seg.CallbackId()] = struct{}{} + } + } + + return rv +} diff --git a/index_test.go b/index_test.go index 7ed27ff86..3516a87ed 100644 --- a/index_test.go +++ b/index_test.go @@ -561,324 +561,328 @@ func TestBM25GlobalScoring(t *testing.T) { } func TestBytesRead(t *testing.T) { - tmpIndexPath := createTmpIndexPath(t) - defer cleanupTmpIndexPath(t, tmpIndexPath) - - indexMapping := NewIndexMapping() - indexMapping.TypeField = "type" - indexMapping.DefaultAnalyzer = "en" - documentMapping := NewDocumentMapping() - indexMapping.AddDocumentMapping("hotel", documentMapping) - indexMapping.StoreDynamic = false - indexMapping.DocValuesDynamic = false - contentFieldMapping := NewTextFieldMapping() - contentFieldMapping.Store = false - - reviewsMapping := NewDocumentMapping() - reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) - documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) - - typeFieldMapping := NewTextFieldMapping() - typeFieldMapping.Store = false - documentMapping.AddFieldMappingsAt("type", typeFieldMapping) - - idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) - if err != nil { - t.Fatal(err) - } - - defer func() { - err := idx.Close() - if err != nil { - t.Fatal(err) - } - }() - - batch, err := getBatchFromData(idx, "sample-data.json") - if err != nil { - t.Fatalf("failed to form a batch") - } - err = idx.Batch(batch) - if err != nil { - t.Fatalf("failed to index batch %v\n", err) - } - query := NewQueryStringQuery("united") - searchRequest := NewSearchRequestOptions(query, int(10), 0, true) - - res, err := idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - stats, _ := idx.StatsMap()["index"].(map[string]interface{}) - prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) - - expectedBytesRead := uint64(22049) - if supportForVectorSearch { - expectedBytesRead = 22459 - } - - if prevBytesRead != expectedBytesRead && res.Cost == prevBytesRead { - t.Fatalf("expected bytes read for query string %v, got %v", - expectedBytesRead, prevBytesRead) - } - - // subsequent queries on the same field results in lesser amount - // of bytes read because the segment static and dictionary is reused and not - // loaded from mmap'd filed - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 66 && res.Cost == bytesRead-prevBytesRead { - t.Fatalf("expected bytes read for query string 66, got %v", - bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - fuzz := NewFuzzyQuery("hotel") - fuzz.FieldVal = "reviews.content" - fuzz.Fuzziness = 2 - searchRequest = NewSearchRequest(fuzz) - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 8468 && res.Cost == bytesRead-prevBytesRead { - t.Fatalf("expected bytes read for fuzzy query is 8468, got %v", - bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - typeFacet := NewFacetRequest("type", 2) - query = NewQueryStringQuery("united") - searchRequest = NewSearchRequestOptions(query, int(0), 0, true) - searchRequest.AddFacet("types", typeFacet) - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if !approxSame(bytesRead-prevBytesRead, 196) && res.Cost == bytesRead-prevBytesRead { - t.Fatalf("expected bytes read for faceted query is around 196, got %v", - bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - min := float64(8660) - max := float64(8665) - numRangeQuery := NewNumericRangeQuery(&min, &max) - numRangeQuery.FieldVal = "id" - searchRequest = NewSearchRequestOptions(numRangeQuery, int(10), 0, true) - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 924 && res.Cost == bytesRead-prevBytesRead { - t.Fatalf("expected bytes read for numeric range query is 924, got %v", - bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - searchRequest.Highlight = &HighlightRequest{} - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 105 && res.Cost == bytesRead-prevBytesRead { - t.Fatalf("expected bytes read for query with highlighter is 105, got %v", - bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - disQuery := NewDisjunctionQuery(NewMatchQuery("hotel"), NewMatchQuery("united")) - searchRequest = NewSearchRequestOptions(disQuery, int(10), 0, true) - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - // expectation is that the bytes read is roughly equal to sum of sub queries in - // the disjunction query plus the segment loading portion for the second subquery - // since it's created afresh and not reused - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 120 && res.Cost == bytesRead-prevBytesRead { - t.Fatalf("expected bytes read for disjunction query is 120, got %v", - bytesRead-prevBytesRead) - } + // CHECK num bytes stats + + // tmpIndexPath := createTmpIndexPath(t) + // defer cleanupTmpIndexPath(t, tmpIndexPath) + + // indexMapping := NewIndexMapping() + // indexMapping.TypeField = "type" + // indexMapping.DefaultAnalyzer = "en" + // documentMapping := NewDocumentMapping() + // indexMapping.AddDocumentMapping("hotel", documentMapping) + // indexMapping.StoreDynamic = false + // indexMapping.DocValuesDynamic = false + // contentFieldMapping := NewTextFieldMapping() + // contentFieldMapping.Store = false + + // reviewsMapping := NewDocumentMapping() + // reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) + // documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) + + // typeFieldMapping := NewTextFieldMapping() + // typeFieldMapping.Store = false + // documentMapping.AddFieldMappingsAt("type", typeFieldMapping) + + // idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) + // if err != nil { + // t.Fatal(err) + // } + + // defer func() { + // err := idx.Close() + // if err != nil { + // t.Fatal(err) + // } + // }() + + // batch, err := getBatchFromData(idx, "sample-data.json") + // if err != nil { + // t.Fatalf("failed to form a batch") + // } + // err = idx.Batch(batch) + // if err != nil { + // t.Fatalf("failed to index batch %v\n", err) + // } + // query := NewQueryStringQuery("united") + // searchRequest := NewSearchRequestOptions(query, int(10), 0, true) + + // res, err := idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + // stats, _ := idx.StatsMap()["index"].(map[string]interface{}) + // prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) + + // expectedBytesRead := uint64(22049) + // if supportForVectorSearch { + // expectedBytesRead = 22459 + // } + + // if prevBytesRead != expectedBytesRead && res.Cost == prevBytesRead { + // t.Fatalf("expected bytes read for query string %v, got %v", + // expectedBytesRead, prevBytesRead) + // } + + // // subsequent queries on the same field results in lesser amount + // // of bytes read because the segment static and dictionary is reused and not + // // loaded from mmap'd filed + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 66 && res.Cost == bytesRead-prevBytesRead { + // t.Fatalf("expected bytes read for query string 66, got %v", + // bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // fuzz := NewFuzzyQuery("hotel") + // fuzz.FieldVal = "reviews.content" + // fuzz.Fuzziness = 2 + // searchRequest = NewSearchRequest(fuzz) + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 8468 && res.Cost == bytesRead-prevBytesRead { + // t.Fatalf("expected bytes read for fuzzy query is 8468, got %v", + // bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // typeFacet := NewFacetRequest("type", 2) + // query = NewQueryStringQuery("united") + // searchRequest = NewSearchRequestOptions(query, int(0), 0, true) + // searchRequest.AddFacet("types", typeFacet) + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if !approxSame(bytesRead-prevBytesRead, 196) && res.Cost == bytesRead-prevBytesRead { + // t.Fatalf("expected bytes read for faceted query is around 196, got %v", + // bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // min := float64(8660) + // max := float64(8665) + // numRangeQuery := NewNumericRangeQuery(&min, &max) + // numRangeQuery.FieldVal = "id" + // searchRequest = NewSearchRequestOptions(numRangeQuery, int(10), 0, true) + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 924 && res.Cost == bytesRead-prevBytesRead { + // t.Fatalf("expected bytes read for numeric range query is 924, got %v", + // bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + // searchRequest.Highlight = &HighlightRequest{} + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 105 && res.Cost == bytesRead-prevBytesRead { + // t.Fatalf("expected bytes read for query with highlighter is 105, got %v", + // bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // disQuery := NewDisjunctionQuery(NewMatchQuery("hotel"), NewMatchQuery("united")) + // searchRequest = NewSearchRequestOptions(disQuery, int(10), 0, true) + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + // // expectation is that the bytes read is roughly equal to sum of sub queries in + // // the disjunction query plus the segment loading portion for the second subquery + // // since it's created afresh and not reused + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 120 && res.Cost == bytesRead-prevBytesRead { + // t.Fatalf("expected bytes read for disjunction query is 120, got %v", + // bytesRead-prevBytesRead) + // } } func TestBytesReadStored(t *testing.T) { - tmpIndexPath := createTmpIndexPath(t) - defer cleanupTmpIndexPath(t, tmpIndexPath) - - indexMapping := NewIndexMapping() - indexMapping.TypeField = "type" - indexMapping.DefaultAnalyzer = "en" - documentMapping := NewDocumentMapping() - indexMapping.AddDocumentMapping("hotel", documentMapping) - - indexMapping.DocValuesDynamic = false - indexMapping.StoreDynamic = false - - contentFieldMapping := NewTextFieldMapping() - contentFieldMapping.Store = true - contentFieldMapping.IncludeInAll = false - contentFieldMapping.IncludeTermVectors = false - - reviewsMapping := NewDocumentMapping() - reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) - documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) - - typeFieldMapping := NewTextFieldMapping() - typeFieldMapping.Store = false - typeFieldMapping.IncludeInAll = false - typeFieldMapping.IncludeTermVectors = false - documentMapping.AddFieldMappingsAt("type", typeFieldMapping) - idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) - if err != nil { - t.Fatal(err) - } - batch, err := getBatchFromData(idx, "sample-data.json") - if err != nil { - t.Fatalf("failed to form a batch %v\n", err) - } - err = idx.Batch(batch) - if err != nil { - t.Fatalf("failed to index batch %v\n", err) - } - query := NewTermQuery("hotel") - query.FieldVal = "reviews.content" - searchRequest := NewSearchRequestOptions(query, int(10), 0, true) - res, err := idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ := idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) - - expectedBytesRead := uint64(11911) - if supportForVectorSearch { - expectedBytesRead = 12321 - } - - if bytesRead != expectedBytesRead && bytesRead == res.Cost { - t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) - } - prevBytesRead := bytesRead - - searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 48 && bytesRead-prevBytesRead == res.Cost { - t.Fatalf("expected the bytes read stat to be around 48, got %v", bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - searchRequest.Fields = []string{"*"} - res, err = idx.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - - if bytesRead-prevBytesRead != 26511 && bytesRead-prevBytesRead == res.Cost { - t.Fatalf("expected the bytes read stat to be around 26511, got %v", - bytesRead-prevBytesRead) - } - idx.Close() - cleanupTmpIndexPath(t, tmpIndexPath) - - // same type of querying but on field "type" - contentFieldMapping.Store = false - typeFieldMapping.Store = true - - tmpIndexPath1 := createTmpIndexPath(t) - defer cleanupTmpIndexPath(t, tmpIndexPath1) - - idx1, err := NewUsing(tmpIndexPath1, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) - if err != nil { - t.Fatal(err) - } - defer func() { - err := idx1.Close() - if err != nil { - t.Fatal(err) - } - }() - - batch, err = getBatchFromData(idx1, "sample-data.json") - if err != nil { - t.Fatalf("failed to form a batch %v\n", err) - } - err = idx1.Batch(batch) - if err != nil { - t.Fatalf("failed to index batch %v\n", err) - } - - query = NewTermQuery("hotel") - query.FieldVal = "type" - searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - res, err = idx1.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - - expectedBytesRead = uint64(4097) - if supportForVectorSearch { - expectedBytesRead = 4507 - } - - if bytesRead != expectedBytesRead && bytesRead == res.Cost { - t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) - } - prevBytesRead = bytesRead - - res, err = idx1.Search(searchRequest) - if err != nil { - t.Error(err) - } - stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 47 && bytesRead-prevBytesRead == res.Cost { - t.Fatalf("expected the bytes read stat to be around 47, got %v", bytesRead-prevBytesRead) - } - prevBytesRead = bytesRead - - searchRequest.Fields = []string{"*"} - res, err = idx1.Search(searchRequest) - if err != nil { - t.Error(err) - } - - stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) - bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - if bytesRead-prevBytesRead != 77 && bytesRead-prevBytesRead == res.Cost { - t.Fatalf("expected the bytes read stat to be around 77, got %v", bytesRead-prevBytesRead) - } + // CHECK num bytes stats + + // tmpIndexPath := createTmpIndexPath(t) + // defer cleanupTmpIndexPath(t, tmpIndexPath) + + // indexMapping := NewIndexMapping() + // indexMapping.TypeField = "type" + // indexMapping.DefaultAnalyzer = "en" + // documentMapping := NewDocumentMapping() + // indexMapping.AddDocumentMapping("hotel", documentMapping) + + // indexMapping.DocValuesDynamic = false + // indexMapping.StoreDynamic = false + + // contentFieldMapping := NewTextFieldMapping() + // contentFieldMapping.Store = true + // contentFieldMapping.IncludeInAll = false + // contentFieldMapping.IncludeTermVectors = false + + // reviewsMapping := NewDocumentMapping() + // reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) + // documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) + + // typeFieldMapping := NewTextFieldMapping() + // typeFieldMapping.Store = false + // typeFieldMapping.IncludeInAll = false + // typeFieldMapping.IncludeTermVectors = false + // documentMapping.AddFieldMappingsAt("type", typeFieldMapping) + // idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) + // if err != nil { + // t.Fatal(err) + // } + // batch, err := getBatchFromData(idx, "sample-data.json") + // if err != nil { + // t.Fatalf("failed to form a batch %v\n", err) + // } + // err = idx.Batch(batch) + // if err != nil { + // t.Fatalf("failed to index batch %v\n", err) + // } + // query := NewTermQuery("hotel") + // query.FieldVal = "reviews.content" + // searchRequest := NewSearchRequestOptions(query, int(10), 0, true) + // res, err := idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ := idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) + + // expectedBytesRead := uint64(11911) + // if supportForVectorSearch { + // expectedBytesRead = 12321 + // } + + // if bytesRead != expectedBytesRead && bytesRead == res.Cost { + // t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) + // } + // prevBytesRead := bytesRead + + // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 48 && bytesRead-prevBytesRead == res.Cost { + // t.Fatalf("expected the bytes read stat to be around 48, got %v", bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + // searchRequest.Fields = []string{"*"} + // res, err = idx.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + + // if bytesRead-prevBytesRead != 26511 && bytesRead-prevBytesRead == res.Cost { + // t.Fatalf("expected the bytes read stat to be around 26511, got %v", + // bytesRead-prevBytesRead) + // } + // idx.Close() + // cleanupTmpIndexPath(t, tmpIndexPath) + + // // same type of querying but on field "type" + // contentFieldMapping.Store = false + // typeFieldMapping.Store = true + + // tmpIndexPath1 := createTmpIndexPath(t) + // defer cleanupTmpIndexPath(t, tmpIndexPath1) + + // idx1, err := NewUsing(tmpIndexPath1, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) + // if err != nil { + // t.Fatal(err) + // } + // defer func() { + // err := idx1.Close() + // if err != nil { + // t.Fatal(err) + // } + // }() + + // batch, err = getBatchFromData(idx1, "sample-data.json") + // if err != nil { + // t.Fatalf("failed to form a batch %v\n", err) + // } + // err = idx1.Batch(batch) + // if err != nil { + // t.Fatalf("failed to index batch %v\n", err) + // } + + // query = NewTermQuery("hotel") + // query.FieldVal = "type" + // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + // res, err = idx1.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + + // expectedBytesRead = uint64(4097) + // if supportForVectorSearch { + // expectedBytesRead = 4507 + // } + + // if bytesRead != expectedBytesRead && bytesRead == res.Cost { + // t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) + // } + // prevBytesRead = bytesRead + + // res, err = idx1.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + // stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 47 && bytesRead-prevBytesRead == res.Cost { + // t.Fatalf("expected the bytes read stat to be around 47, got %v", bytesRead-prevBytesRead) + // } + // prevBytesRead = bytesRead + + // searchRequest.Fields = []string{"*"} + // res, err = idx1.Search(searchRequest) + // if err != nil { + // t.Error(err) + // } + + // stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) + // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + // if bytesRead-prevBytesRead != 77 && bytesRead-prevBytesRead == res.Cost { + // t.Fatalf("expected the bytes read stat to be around 77, got %v", bytesRead-prevBytesRead) + // } } func readDataFromFile(fileName string) ([]map[string]interface{}, error) { @@ -3245,3 +3249,138 @@ func TestFuzzyScoring(t *testing.T) { t.Fatal(err) } } + +// func TestEncr(t *testing.T) { +// tmpIndexPath := createTmpIndexPath(t) +// defer cleanupTmpIndexPath(t, tmpIndexPath) + +// // use default mapping +// mapping := NewIndexMapping() + +// // create a scorch index with default SAFE batches +// var index Index +// index, err = NewUsing(tmpIndexPath, mapping, scorch.Name, scorch.Name, nil) +// if err != nil { +// log.Fatal(err) +// } +// defer func() { +// err := index.Close() +// if err != nil { +// t.Fatal(err) +// } +// }() + +// docs := []map[string]interface{}{ +// { +// "field1": "abc", +// "field2": "def", +// "field3": "ghi", +// }, +// { +// "field1": "jkl", +// "field2": "mno", +// "field3": "pqr", +// }, +// { +// "field1": "stu", +// "field2": "vwx", +// "field3": "yz", +// }, +// } + +// for j := 0; j < 100; j++ { +// batch := index.NewBatch() +// for i, doc := range docs { +// err := batch.Index(fmt.Sprintf("%d", i+j*3), doc) +// if err != nil { +// t.Fatal(err) +// } +// } +// err = index.Batch(batch) +// if err != nil { +// t.Fatal(err) +// } +// } + +// query := NewMatchQuery("abc") +// query.SetField("field1") +// searchRequest := NewSearchRequestOptions(query, 10, 0, true) +// res, err := index.Search(searchRequest) +// if err != nil { +// t.Error(err) +// } +// fmt.Printf("Search result: %+v\n", res) + +// return +// } + +// func TestEncrMerge(t *testing.T) { +// tmpIndexPath := createTmpIndexPath(t) +// defer cleanupTmpIndexPath(t, tmpIndexPath) + +// // use default mapping +// mapping := NewIndexMapping() + +// idx, err := NewUsing(tmpIndexPath, mapping, scorch.Name, scorch.Name, nil) +// if err != nil { +// log.Fatal(err) +// } +// defer func() { +// err := idx.Close() +// if err != nil { +// t.Fatal(err) +// } +// }() +// cbId := 0 +// go func() { +// for { +// time.Sleep(time.Second) +// key := make([]byte, 32) +// if _, err := crand.Read(key); err != nil { +// panic("failed to generate AES key: " + err.Error()) +// } + +// SetNewCallback(fmt.Sprintf("cb-%d", cbId), key) +// cbId++ +// fmt.Println(cbId) +// } +// }() + +// genRandomString := func(length int) string { +// const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +// b := make([]byte, length) +// for i := range b { +// b[i] = charset[rand.Intn(len(charset))] +// } +// return string(b) +// } + +// for i := 0; i < 1000; i++ { +// docs := []map[string]interface{}{ +// { +// "f1": genRandomString(10), +// }, +// { +// "f1": genRandomString(10), +// }, +// } + +// batch := idx.NewBatch() +// for j, doc := range docs { +// err := batch.Index(fmt.Sprintf("%d", i*len(docs)+j), doc) +// if err != nil { +// t.Fatal(err) +// } +// } + +// err = idx.Batch(batch) +// if err != nil { +// t.Fatal(err) +// } +// } + +// sIndex := idx.(*indexImpl).i.(*scorch.Scorch) +// time.Sleep(10 * time.Second) +// sIndex.RemoveCallback(sIndex.GetUsedIds()) // remove the first callback +// time.Sleep(10000 * time.Second) +// } diff --git a/search/searcher/base_test.go b/search/searcher/base_test.go index 6f80bf653..5b84007a5 100644 --- a/search/searcher/base_test.go +++ b/search/searcher/base_test.go @@ -67,8 +67,16 @@ func initTwoDocs(twoDocIndex index.Index) { panic(err) } batch := index.NewBatch() + + // Make a copy of the documents to prevent modification of the + // original slice across multiple tests + docs := []document.Document{} for _, doc := range twoDocIndexDocs { - batch.Update(doc) + docs = append(docs, *doc) + } + + for _, doc := range docs { + batch.Update(&doc) } err = twoDocIndex.Batch(batch) if err != nil { From eec22e4d671f552353370bfce36b40a3e31d42e9 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 23 Jul 2025 11:11:47 +0530 Subject: [PATCH 02/11] MB-65860: Fixed failing test --- index_test.go | 769 +++++++++++++++++++++----------------------------- 1 file changed, 315 insertions(+), 454 deletions(-) diff --git a/index_test.go b/index_test.go index 3516a87ed..7ed27ff86 100644 --- a/index_test.go +++ b/index_test.go @@ -561,328 +561,324 @@ func TestBM25GlobalScoring(t *testing.T) { } func TestBytesRead(t *testing.T) { - // CHECK num bytes stats - - // tmpIndexPath := createTmpIndexPath(t) - // defer cleanupTmpIndexPath(t, tmpIndexPath) - - // indexMapping := NewIndexMapping() - // indexMapping.TypeField = "type" - // indexMapping.DefaultAnalyzer = "en" - // documentMapping := NewDocumentMapping() - // indexMapping.AddDocumentMapping("hotel", documentMapping) - // indexMapping.StoreDynamic = false - // indexMapping.DocValuesDynamic = false - // contentFieldMapping := NewTextFieldMapping() - // contentFieldMapping.Store = false - - // reviewsMapping := NewDocumentMapping() - // reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) - // documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) - - // typeFieldMapping := NewTextFieldMapping() - // typeFieldMapping.Store = false - // documentMapping.AddFieldMappingsAt("type", typeFieldMapping) - - // idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) - // if err != nil { - // t.Fatal(err) - // } - - // defer func() { - // err := idx.Close() - // if err != nil { - // t.Fatal(err) - // } - // }() - - // batch, err := getBatchFromData(idx, "sample-data.json") - // if err != nil { - // t.Fatalf("failed to form a batch") - // } - // err = idx.Batch(batch) - // if err != nil { - // t.Fatalf("failed to index batch %v\n", err) - // } - // query := NewQueryStringQuery("united") - // searchRequest := NewSearchRequestOptions(query, int(10), 0, true) - - // res, err := idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - // stats, _ := idx.StatsMap()["index"].(map[string]interface{}) - // prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) - - // expectedBytesRead := uint64(22049) - // if supportForVectorSearch { - // expectedBytesRead = 22459 - // } - - // if prevBytesRead != expectedBytesRead && res.Cost == prevBytesRead { - // t.Fatalf("expected bytes read for query string %v, got %v", - // expectedBytesRead, prevBytesRead) - // } - - // // subsequent queries on the same field results in lesser amount - // // of bytes read because the segment static and dictionary is reused and not - // // loaded from mmap'd filed - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 66 && res.Cost == bytesRead-prevBytesRead { - // t.Fatalf("expected bytes read for query string 66, got %v", - // bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // fuzz := NewFuzzyQuery("hotel") - // fuzz.FieldVal = "reviews.content" - // fuzz.Fuzziness = 2 - // searchRequest = NewSearchRequest(fuzz) - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 8468 && res.Cost == bytesRead-prevBytesRead { - // t.Fatalf("expected bytes read for fuzzy query is 8468, got %v", - // bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // typeFacet := NewFacetRequest("type", 2) - // query = NewQueryStringQuery("united") - // searchRequest = NewSearchRequestOptions(query, int(0), 0, true) - // searchRequest.AddFacet("types", typeFacet) - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if !approxSame(bytesRead-prevBytesRead, 196) && res.Cost == bytesRead-prevBytesRead { - // t.Fatalf("expected bytes read for faceted query is around 196, got %v", - // bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // min := float64(8660) - // max := float64(8665) - // numRangeQuery := NewNumericRangeQuery(&min, &max) - // numRangeQuery.FieldVal = "id" - // searchRequest = NewSearchRequestOptions(numRangeQuery, int(10), 0, true) - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 924 && res.Cost == bytesRead-prevBytesRead { - // t.Fatalf("expected bytes read for numeric range query is 924, got %v", - // bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - // searchRequest.Highlight = &HighlightRequest{} - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 105 && res.Cost == bytesRead-prevBytesRead { - // t.Fatalf("expected bytes read for query with highlighter is 105, got %v", - // bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // disQuery := NewDisjunctionQuery(NewMatchQuery("hotel"), NewMatchQuery("united")) - // searchRequest = NewSearchRequestOptions(disQuery, int(10), 0, true) - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - // // expectation is that the bytes read is roughly equal to sum of sub queries in - // // the disjunction query plus the segment loading portion for the second subquery - // // since it's created afresh and not reused - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 120 && res.Cost == bytesRead-prevBytesRead { - // t.Fatalf("expected bytes read for disjunction query is 120, got %v", - // bytesRead-prevBytesRead) - // } + tmpIndexPath := createTmpIndexPath(t) + defer cleanupTmpIndexPath(t, tmpIndexPath) + + indexMapping := NewIndexMapping() + indexMapping.TypeField = "type" + indexMapping.DefaultAnalyzer = "en" + documentMapping := NewDocumentMapping() + indexMapping.AddDocumentMapping("hotel", documentMapping) + indexMapping.StoreDynamic = false + indexMapping.DocValuesDynamic = false + contentFieldMapping := NewTextFieldMapping() + contentFieldMapping.Store = false + + reviewsMapping := NewDocumentMapping() + reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) + documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) + + typeFieldMapping := NewTextFieldMapping() + typeFieldMapping.Store = false + documentMapping.AddFieldMappingsAt("type", typeFieldMapping) + + idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) + if err != nil { + t.Fatal(err) + } + + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + batch, err := getBatchFromData(idx, "sample-data.json") + if err != nil { + t.Fatalf("failed to form a batch") + } + err = idx.Batch(batch) + if err != nil { + t.Fatalf("failed to index batch %v\n", err) + } + query := NewQueryStringQuery("united") + searchRequest := NewSearchRequestOptions(query, int(10), 0, true) + + res, err := idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + stats, _ := idx.StatsMap()["index"].(map[string]interface{}) + prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) + + expectedBytesRead := uint64(22049) + if supportForVectorSearch { + expectedBytesRead = 22459 + } + + if prevBytesRead != expectedBytesRead && res.Cost == prevBytesRead { + t.Fatalf("expected bytes read for query string %v, got %v", + expectedBytesRead, prevBytesRead) + } + + // subsequent queries on the same field results in lesser amount + // of bytes read because the segment static and dictionary is reused and not + // loaded from mmap'd filed + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 66 && res.Cost == bytesRead-prevBytesRead { + t.Fatalf("expected bytes read for query string 66, got %v", + bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + fuzz := NewFuzzyQuery("hotel") + fuzz.FieldVal = "reviews.content" + fuzz.Fuzziness = 2 + searchRequest = NewSearchRequest(fuzz) + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 8468 && res.Cost == bytesRead-prevBytesRead { + t.Fatalf("expected bytes read for fuzzy query is 8468, got %v", + bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + typeFacet := NewFacetRequest("type", 2) + query = NewQueryStringQuery("united") + searchRequest = NewSearchRequestOptions(query, int(0), 0, true) + searchRequest.AddFacet("types", typeFacet) + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if !approxSame(bytesRead-prevBytesRead, 196) && res.Cost == bytesRead-prevBytesRead { + t.Fatalf("expected bytes read for faceted query is around 196, got %v", + bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + min := float64(8660) + max := float64(8665) + numRangeQuery := NewNumericRangeQuery(&min, &max) + numRangeQuery.FieldVal = "id" + searchRequest = NewSearchRequestOptions(numRangeQuery, int(10), 0, true) + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 924 && res.Cost == bytesRead-prevBytesRead { + t.Fatalf("expected bytes read for numeric range query is 924, got %v", + bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + searchRequest.Highlight = &HighlightRequest{} + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 105 && res.Cost == bytesRead-prevBytesRead { + t.Fatalf("expected bytes read for query with highlighter is 105, got %v", + bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + disQuery := NewDisjunctionQuery(NewMatchQuery("hotel"), NewMatchQuery("united")) + searchRequest = NewSearchRequestOptions(disQuery, int(10), 0, true) + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + // expectation is that the bytes read is roughly equal to sum of sub queries in + // the disjunction query plus the segment loading portion for the second subquery + // since it's created afresh and not reused + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 120 && res.Cost == bytesRead-prevBytesRead { + t.Fatalf("expected bytes read for disjunction query is 120, got %v", + bytesRead-prevBytesRead) + } } func TestBytesReadStored(t *testing.T) { - // CHECK num bytes stats - - // tmpIndexPath := createTmpIndexPath(t) - // defer cleanupTmpIndexPath(t, tmpIndexPath) - - // indexMapping := NewIndexMapping() - // indexMapping.TypeField = "type" - // indexMapping.DefaultAnalyzer = "en" - // documentMapping := NewDocumentMapping() - // indexMapping.AddDocumentMapping("hotel", documentMapping) - - // indexMapping.DocValuesDynamic = false - // indexMapping.StoreDynamic = false - - // contentFieldMapping := NewTextFieldMapping() - // contentFieldMapping.Store = true - // contentFieldMapping.IncludeInAll = false - // contentFieldMapping.IncludeTermVectors = false - - // reviewsMapping := NewDocumentMapping() - // reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) - // documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) - - // typeFieldMapping := NewTextFieldMapping() - // typeFieldMapping.Store = false - // typeFieldMapping.IncludeInAll = false - // typeFieldMapping.IncludeTermVectors = false - // documentMapping.AddFieldMappingsAt("type", typeFieldMapping) - // idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) - // if err != nil { - // t.Fatal(err) - // } - // batch, err := getBatchFromData(idx, "sample-data.json") - // if err != nil { - // t.Fatalf("failed to form a batch %v\n", err) - // } - // err = idx.Batch(batch) - // if err != nil { - // t.Fatalf("failed to index batch %v\n", err) - // } - // query := NewTermQuery("hotel") - // query.FieldVal = "reviews.content" - // searchRequest := NewSearchRequestOptions(query, int(10), 0, true) - // res, err := idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ := idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) - - // expectedBytesRead := uint64(11911) - // if supportForVectorSearch { - // expectedBytesRead = 12321 - // } - - // if bytesRead != expectedBytesRead && bytesRead == res.Cost { - // t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) - // } - // prevBytesRead := bytesRead - - // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 48 && bytesRead-prevBytesRead == res.Cost { - // t.Fatalf("expected the bytes read stat to be around 48, got %v", bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - // searchRequest.Fields = []string{"*"} - // res, err = idx.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ = idx.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - - // if bytesRead-prevBytesRead != 26511 && bytesRead-prevBytesRead == res.Cost { - // t.Fatalf("expected the bytes read stat to be around 26511, got %v", - // bytesRead-prevBytesRead) - // } - // idx.Close() - // cleanupTmpIndexPath(t, tmpIndexPath) - - // // same type of querying but on field "type" - // contentFieldMapping.Store = false - // typeFieldMapping.Store = true - - // tmpIndexPath1 := createTmpIndexPath(t) - // defer cleanupTmpIndexPath(t, tmpIndexPath1) - - // idx1, err := NewUsing(tmpIndexPath1, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) - // if err != nil { - // t.Fatal(err) - // } - // defer func() { - // err := idx1.Close() - // if err != nil { - // t.Fatal(err) - // } - // }() - - // batch, err = getBatchFromData(idx1, "sample-data.json") - // if err != nil { - // t.Fatalf("failed to form a batch %v\n", err) - // } - // err = idx1.Batch(batch) - // if err != nil { - // t.Fatalf("failed to index batch %v\n", err) - // } - - // query = NewTermQuery("hotel") - // query.FieldVal = "type" - // searchRequest = NewSearchRequestOptions(query, int(10), 0, true) - // res, err = idx1.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - - // expectedBytesRead = uint64(4097) - // if supportForVectorSearch { - // expectedBytesRead = 4507 - // } - - // if bytesRead != expectedBytesRead && bytesRead == res.Cost { - // t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) - // } - // prevBytesRead = bytesRead - - // res, err = idx1.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - // stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 47 && bytesRead-prevBytesRead == res.Cost { - // t.Fatalf("expected the bytes read stat to be around 47, got %v", bytesRead-prevBytesRead) - // } - // prevBytesRead = bytesRead - - // searchRequest.Fields = []string{"*"} - // res, err = idx1.Search(searchRequest) - // if err != nil { - // t.Error(err) - // } - - // stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) - // bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) - // if bytesRead-prevBytesRead != 77 && bytesRead-prevBytesRead == res.Cost { - // t.Fatalf("expected the bytes read stat to be around 77, got %v", bytesRead-prevBytesRead) - // } + tmpIndexPath := createTmpIndexPath(t) + defer cleanupTmpIndexPath(t, tmpIndexPath) + + indexMapping := NewIndexMapping() + indexMapping.TypeField = "type" + indexMapping.DefaultAnalyzer = "en" + documentMapping := NewDocumentMapping() + indexMapping.AddDocumentMapping("hotel", documentMapping) + + indexMapping.DocValuesDynamic = false + indexMapping.StoreDynamic = false + + contentFieldMapping := NewTextFieldMapping() + contentFieldMapping.Store = true + contentFieldMapping.IncludeInAll = false + contentFieldMapping.IncludeTermVectors = false + + reviewsMapping := NewDocumentMapping() + reviewsMapping.AddFieldMappingsAt("content", contentFieldMapping) + documentMapping.AddSubDocumentMapping("reviews", reviewsMapping) + + typeFieldMapping := NewTextFieldMapping() + typeFieldMapping.Store = false + typeFieldMapping.IncludeInAll = false + typeFieldMapping.IncludeTermVectors = false + documentMapping.AddFieldMappingsAt("type", typeFieldMapping) + idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) + if err != nil { + t.Fatal(err) + } + batch, err := getBatchFromData(idx, "sample-data.json") + if err != nil { + t.Fatalf("failed to form a batch %v\n", err) + } + err = idx.Batch(batch) + if err != nil { + t.Fatalf("failed to index batch %v\n", err) + } + query := NewTermQuery("hotel") + query.FieldVal = "reviews.content" + searchRequest := NewSearchRequestOptions(query, int(10), 0, true) + res, err := idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ := idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64) + + expectedBytesRead := uint64(11911) + if supportForVectorSearch { + expectedBytesRead = 12321 + } + + if bytesRead != expectedBytesRead && bytesRead == res.Cost { + t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) + } + prevBytesRead := bytesRead + + searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 48 && bytesRead-prevBytesRead == res.Cost { + t.Fatalf("expected the bytes read stat to be around 48, got %v", bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + searchRequest.Fields = []string{"*"} + res, err = idx.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ = idx.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + + if bytesRead-prevBytesRead != 26511 && bytesRead-prevBytesRead == res.Cost { + t.Fatalf("expected the bytes read stat to be around 26511, got %v", + bytesRead-prevBytesRead) + } + idx.Close() + cleanupTmpIndexPath(t, tmpIndexPath) + + // same type of querying but on field "type" + contentFieldMapping.Store = false + typeFieldMapping.Store = true + + tmpIndexPath1 := createTmpIndexPath(t) + defer cleanupTmpIndexPath(t, tmpIndexPath1) + + idx1, err := NewUsing(tmpIndexPath1, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil) + if err != nil { + t.Fatal(err) + } + defer func() { + err := idx1.Close() + if err != nil { + t.Fatal(err) + } + }() + + batch, err = getBatchFromData(idx1, "sample-data.json") + if err != nil { + t.Fatalf("failed to form a batch %v\n", err) + } + err = idx1.Batch(batch) + if err != nil { + t.Fatalf("failed to index batch %v\n", err) + } + + query = NewTermQuery("hotel") + query.FieldVal = "type" + searchRequest = NewSearchRequestOptions(query, int(10), 0, true) + res, err = idx1.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + + expectedBytesRead = uint64(4097) + if supportForVectorSearch { + expectedBytesRead = 4507 + } + + if bytesRead != expectedBytesRead && bytesRead == res.Cost { + t.Fatalf("expected the bytes read stat to be around %v, got %v", expectedBytesRead, bytesRead) + } + prevBytesRead = bytesRead + + res, err = idx1.Search(searchRequest) + if err != nil { + t.Error(err) + } + stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 47 && bytesRead-prevBytesRead == res.Cost { + t.Fatalf("expected the bytes read stat to be around 47, got %v", bytesRead-prevBytesRead) + } + prevBytesRead = bytesRead + + searchRequest.Fields = []string{"*"} + res, err = idx1.Search(searchRequest) + if err != nil { + t.Error(err) + } + + stats, _ = idx1.StatsMap()["index"].(map[string]interface{}) + bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64) + if bytesRead-prevBytesRead != 77 && bytesRead-prevBytesRead == res.Cost { + t.Fatalf("expected the bytes read stat to be around 77, got %v", bytesRead-prevBytesRead) + } } func readDataFromFile(fileName string) ([]map[string]interface{}, error) { @@ -3249,138 +3245,3 @@ func TestFuzzyScoring(t *testing.T) { t.Fatal(err) } } - -// func TestEncr(t *testing.T) { -// tmpIndexPath := createTmpIndexPath(t) -// defer cleanupTmpIndexPath(t, tmpIndexPath) - -// // use default mapping -// mapping := NewIndexMapping() - -// // create a scorch index with default SAFE batches -// var index Index -// index, err = NewUsing(tmpIndexPath, mapping, scorch.Name, scorch.Name, nil) -// if err != nil { -// log.Fatal(err) -// } -// defer func() { -// err := index.Close() -// if err != nil { -// t.Fatal(err) -// } -// }() - -// docs := []map[string]interface{}{ -// { -// "field1": "abc", -// "field2": "def", -// "field3": "ghi", -// }, -// { -// "field1": "jkl", -// "field2": "mno", -// "field3": "pqr", -// }, -// { -// "field1": "stu", -// "field2": "vwx", -// "field3": "yz", -// }, -// } - -// for j := 0; j < 100; j++ { -// batch := index.NewBatch() -// for i, doc := range docs { -// err := batch.Index(fmt.Sprintf("%d", i+j*3), doc) -// if err != nil { -// t.Fatal(err) -// } -// } -// err = index.Batch(batch) -// if err != nil { -// t.Fatal(err) -// } -// } - -// query := NewMatchQuery("abc") -// query.SetField("field1") -// searchRequest := NewSearchRequestOptions(query, 10, 0, true) -// res, err := index.Search(searchRequest) -// if err != nil { -// t.Error(err) -// } -// fmt.Printf("Search result: %+v\n", res) - -// return -// } - -// func TestEncrMerge(t *testing.T) { -// tmpIndexPath := createTmpIndexPath(t) -// defer cleanupTmpIndexPath(t, tmpIndexPath) - -// // use default mapping -// mapping := NewIndexMapping() - -// idx, err := NewUsing(tmpIndexPath, mapping, scorch.Name, scorch.Name, nil) -// if err != nil { -// log.Fatal(err) -// } -// defer func() { -// err := idx.Close() -// if err != nil { -// t.Fatal(err) -// } -// }() -// cbId := 0 -// go func() { -// for { -// time.Sleep(time.Second) -// key := make([]byte, 32) -// if _, err := crand.Read(key); err != nil { -// panic("failed to generate AES key: " + err.Error()) -// } - -// SetNewCallback(fmt.Sprintf("cb-%d", cbId), key) -// cbId++ -// fmt.Println(cbId) -// } -// }() - -// genRandomString := func(length int) string { -// const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" -// b := make([]byte, length) -// for i := range b { -// b[i] = charset[rand.Intn(len(charset))] -// } -// return string(b) -// } - -// for i := 0; i < 1000; i++ { -// docs := []map[string]interface{}{ -// { -// "f1": genRandomString(10), -// }, -// { -// "f1": genRandomString(10), -// }, -// } - -// batch := idx.NewBatch() -// for j, doc := range docs { -// err := batch.Index(fmt.Sprintf("%d", i*len(docs)+j), doc) -// if err != nil { -// t.Fatal(err) -// } -// } - -// err = idx.Batch(batch) -// if err != nil { -// t.Fatal(err) -// } -// } - -// sIndex := idx.(*indexImpl).i.(*scorch.Scorch) -// time.Sleep(10 * time.Second) -// sIndex.RemoveCallback(sIndex.GetUsedIds()) // remove the first callback -// time.Sleep(10000 * time.Second) -// } From 2c4d76a742df424572354bbc9bb347416dd67794 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 23 Jul 2025 15:16:45 +0530 Subject: [PATCH 03/11] MB-65680: Added callbacks for index_meta.json and bolt --- builder.go | 5 +- callbacks.go | 65 ++++++++++ index/scorch/builder.go | 2 +- index/scorch/persister.go | 42 +++++- index/scorch/scorch.go | 21 +++ index/scorch/snapshot_index.go | 2 +- index_impl.go | 38 ++++-- index_meta.go | 53 +++++++- index_meta_test.go | 11 +- util/callbacks.go | 228 +++++++++++++++++++++++++++++++++ 10 files changed, 433 insertions(+), 34 deletions(-) create mode 100644 util/callbacks.go diff --git a/builder.go b/builder.go index f170317ee..8ef2adc6e 100644 --- a/builder.go +++ b/builder.go @@ -74,7 +74,10 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int // do not use real config, as these are options for the builder, // not the resulting index meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{}) - err = meta.Save(path) + + // CHECK where this is used + writer := &util.FileWriter{} + err = meta.Save(path, writer) if err != nil { return nil, err } diff --git a/callbacks.go b/callbacks.go index 8960e19b3..1bee60dfc 100644 --- a/callbacks.go +++ b/callbacks.go @@ -129,3 +129,68 @@ func init() { // cbLock.Unlock() // } // } + +type fileWriter struct { + writerCB func(data, counter []byte) ([]byte, error) + counter []byte + id string +} + +func NewFileWriter() (*fileWriter, error) { + var err error + rv := &fileWriter{} + rv.id, rv.writerCB, err = WriterCallbackGetter() + if err != nil { + return nil, err + } + rv.counter, err = CounterGetter() + if err != nil { + return nil, err + } + + return rv, nil +} + +func (w *fileWriter) Process(data []byte) ([]byte, error) { + if w.writerCB != nil { + w.incrementCounter() + return w.writerCB(data, w.counter) + } + return data, nil +} + +func (w *fileWriter) incrementCounter() { + if w.counter != nil { + for i := len(w.counter) - 1; i >= 0; i-- { + if w.counter[i] < 255 { + w.counter[i]++ + return + } + w.counter[i] = 0 + } + } +} + +type fileReader struct { + readerCB func(data []byte) ([]byte, error) + id string +} + +func NewFileReader(cbId string) (*fileReader, error) { + readerCB, err := ReaderCallbackGetter(cbId) + if err != nil { + return nil, err + } + + return &fileReader{ + readerCB: readerCB, + id: cbId, + }, nil +} + +func (r *fileReader) Process(data []byte) ([]byte, error) { + if r.readerCB != nil { + return r.readerCB(data) + } + return data, nil +} diff --git a/index/scorch/builder.go b/index/scorch/builder.go index d4d8e9c07..bc5ec9e9d 100644 --- a/index/scorch/builder.go +++ b/index/scorch/builder.go @@ -303,7 +303,7 @@ func (o *Builder) Close() error { } // fill the root bolt with this fake index snapshot - _, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil, nil) + _, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil, nil, nil) if err != nil { _ = tx.Rollback() _ = rootBolt.Close() diff --git a/index/scorch/persister.go b/index/scorch/persister.go index d92c3a85b..4a8a74817 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -606,8 +606,12 @@ func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory, } func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, - segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory) ( + segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory, + writer *util.FileWriter) ( []string, map[uint64]string, error) { + if writer == nil { + writer = &util.FileWriter{} + } snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket) if err != nil { return nil, nil, err @@ -655,7 +659,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, } // TODO optimize writing these in order? for k, v := range snapshot.internal { - err = internalBucket.Put([]byte(k), v) + buf, err := writer.Process(v) + if err != nil { + return nil, nil, err + } + err = internalBucket.Put([]byte(k), buf) if err != nil { return nil, nil, err } @@ -721,7 +729,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err) } - err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBuf.Bytes()) + roaringBytes, err := writer.Process(roaringBuf.Bytes()) + if err != nil { + return nil, nil, err + } + err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBytes) if err != nil { return nil, nil, err } @@ -733,7 +745,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, err } - err = snapshotSegmentBucket.Put(util.BoltStatsKey, b) + statsBytes, err := writer.Process(b) + if err != nil { + return nil, nil, err + } + err = snapshotSegmentBucket.Put(util.BoltStatsKey, statsBytes) if err != nil { return nil, nil, err } @@ -768,7 +784,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot, exclude map[uint } }() - filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, exclude, nil) + filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, exclude, nil, s.writer) if err != nil { return err } @@ -963,7 +979,10 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return nil, fmt.Errorf("internal bucket missing") } err := internalBucket.ForEach(func(key []byte, val []byte) error { - copiedVal := append([]byte(nil), val...) + copiedVal, err := s.reader.Process(append([]byte(nil), val...)) + if err != nil { + return err + } rv.internal[string(key)] = copiedVal return nil }) @@ -1017,6 +1036,11 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro } deletedBytes := segmentBucket.Get(util.BoltDeletedKey) if deletedBytes != nil { + deletedBytes, err = s.reader.Process(deletedBytes) + if err != nil { + _ = segment.Close() + return nil, err + } deletedBitmap := roaring.NewBitmap() r := bytes.NewReader(deletedBytes) _, err := deletedBitmap.ReadFrom(r) @@ -1031,7 +1055,11 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro statBytes := segmentBucket.Get(util.BoltStatsKey) if statBytes != nil { var statsMap map[string]map[string]uint64 - + statBytes, err = s.reader.Process(statBytes) + if err != nil { + _ = segment.Close() + return nil, err + } err := json.Unmarshal(statBytes, &statsMap) stats := &fieldStats{statMap: statsMap} if err != nil { diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 2e26e39df..99aabce26 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -80,6 +80,9 @@ type Scorch struct { rootBolt *bolt.DB asyncTasks sync.WaitGroup + writer *util.FileWriter + reader *util.FileReader + onEvent func(event Event) bool onAsyncError func(err error, path string) @@ -163,6 +166,24 @@ func NewScorch(storeName string, if ok { rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName] } + writerId, ok := config["writerId"].(string) + var writer *util.FileWriter + if ok { + writer, err = util.NewFileWriterWithId(writerId) + } else { + writer, err = util.NewFileWriter() + } + if err != nil { + return nil, err + } + rv.writer = writer + + reader, err := util.NewFileReader(rv.writer.Id()) + if err != nil { + return nil, err + } + rv.reader = reader + // validate any custom persistor options to // prevent an async error in the persistor routine _, err = rv.parsePersisterOptions() diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index c09a7db40..6c989dfe2 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -1018,7 +1018,7 @@ func (is *IndexSnapshot) CopyTo(d index.Directory) error { return err } - _, _, err = prepareBoltSnapshot(is, tx, "", is.parent.segPlugin, nil, d) + _, _, err = prepareBoltSnapshot(is, tx, "", is.parent.segPlugin, nil, d, nil) if err != nil { _ = tx.Rollback() return fmt.Errorf("error backing up index snapshot: %v", err) diff --git a/index_impl.go b/index_impl.go index a43b3cf75..84026ad68 100644 --- a/index_impl.go +++ b/index_impl.go @@ -45,14 +45,16 @@ import ( ) type indexImpl struct { - path string - name string - meta *indexMeta - i index.Index - m mapping.IndexMapping - mutex sync.RWMutex - open bool - stats *IndexStat + path string + name string + meta *indexMeta + i index.Index + m mapping.IndexMapping + mutex sync.RWMutex + open bool + stats *IndexStat + writer *util.FileWriter + reader *util.FileReader } const storePath = "store" @@ -88,22 +90,29 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, return nil, fmt.Errorf("bleve not configured for file based indexing") } + fileWriter, err := util.NewFileWriter() + if err != nil { + return nil, err + } + rv := indexImpl{ - path: path, - name: path, - m: mapping, - meta: newIndexMeta(indexType, kvstore, kvconfig), + path: path, + name: path, + m: mapping, + meta: newIndexMeta(indexType, kvstore, kvconfig), + writer: fileWriter, } rv.stats = &IndexStat{i: &rv} // at this point there is hope that we can be successful, so save index meta if path != "" { - err = rv.meta.Save(path) + err = rv.meta.Save(path, rv.writer) if err != nil { return nil, err } kvconfig["create_if_missing"] = true kvconfig["error_if_exists"] = true kvconfig["path"] = indexStorePath(path) + kvconfig["callback_id"] = rv.writer.Id() } else { kvconfig["path"] = "" } @@ -153,7 +162,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde } rv.stats = &IndexStat{i: rv} - rv.meta, err = openIndexMeta(path) + rv.meta, rv.reader, err = openIndexMeta(path) if err != nil { return nil, err } @@ -174,6 +183,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde storeConfig["path"] = indexStorePath(path) storeConfig["create_if_missing"] = false storeConfig["error_if_exists"] = false + storeConfig["callback_id"] = rv.reader.Id() for rck, rcv := range runtimeConfig { storeConfig[rck] = rcv if rck == "updated_mapping" { diff --git a/index_meta.go b/index_meta.go index 14b88dcbc..72bbe624a 100644 --- a/index_meta.go +++ b/index_meta.go @@ -15,6 +15,7 @@ package bleve import ( + "encoding/binary" "fmt" "os" "path/filepath" @@ -40,27 +41,50 @@ func newIndexMeta(indexType string, storage string, config map[string]interface{ } } -func openIndexMeta(path string) (*indexMeta, error) { +func openIndexMeta(path string) (*indexMeta, *util.FileReader, error) { if _, err := os.Stat(path); os.IsNotExist(err) { - return nil, ErrorIndexPathDoesNotExist + return nil, nil, ErrorIndexPathDoesNotExist } indexMetaPath := indexMetaPath(path) metaBytes, err := os.ReadFile(indexMetaPath) if err != nil { - return nil, ErrorIndexMetaMissing + return nil, nil, ErrorIndexMetaMissing } + var im indexMeta + fileReader := &util.FileReader{} err = util.UnmarshalJSON(metaBytes, &im) if err != nil { - return nil, ErrorIndexMetaCorrupt + if len(metaBytes) < 4 { + return nil, nil, ErrorIndexMetaCorrupt + } + + pos := len(metaBytes) - 4 + writerIdLen := int(binary.BigEndian.Uint32(metaBytes[pos:])) + pos -= writerIdLen + if pos < 0 { + return nil, nil, ErrorIndexMetaCorrupt + } + + writerId := metaBytes[pos : pos+writerIdLen] + fileReader, err = util.NewFileReader(string(writerId)) + if err != nil { + return nil, nil, err + } + pos -= len(writerId) + err = util.UnmarshalJSON(metaBytes[0:pos], &im) + if err != nil { + return nil, nil, ErrorIndexMetaCorrupt + } } + if im.IndexType == "" { im.IndexType = upsidedown.Name } - return &im, nil + return &im, fileReader, nil } -func (i *indexMeta) Save(path string) (err error) { +func (i *indexMeta) Save(path string, writer *util.FileWriter) (err error) { indexMetaPath := indexMetaPath(path) // ensure any necessary parent directories exist err = os.MkdirAll(path, 0700) @@ -86,10 +110,27 @@ func (i *indexMeta) Save(path string) (err error) { err = ierr } }() + + metaBytes, err = writer.Process(metaBytes) + if err != nil { + return err + } + _, err = indexMetaFile.Write(metaBytes) if err != nil { return err } + + _, err = indexMetaFile.Write([]byte(writer.Id())) + if err != nil { + return err + } + + err = binary.Write(indexMetaFile, binary.BigEndian, uint32(len(writer.Id()))) + if err != nil { + return err + } + return nil } diff --git a/index_meta_test.go b/index_meta_test.go index 7719f577b..d2fa5a805 100644 --- a/index_meta_test.go +++ b/index_meta_test.go @@ -17,6 +17,8 @@ package bleve import ( "os" "testing" + + "github.com/blevesearch/bleve/v2/util" ) func TestIndexMeta(t *testing.T) { @@ -29,21 +31,22 @@ func TestIndexMeta(t *testing.T) { }() // open non-existent meta should give an error - _, err := openIndexMeta(testIndexPath) + _, _, err := openIndexMeta(testIndexPath) if err == nil { t.Errorf("expected error, got nil") } + writer := &util.FileWriter{} // create meta im := &indexMeta{Storage: "boltdb"} - err = im.Save(testIndexPath) + err = im.Save(testIndexPath, writer) if err != nil { t.Error(err) } im = nil // open a meta that exists - im, err = openIndexMeta(testIndexPath) + im, _, err = openIndexMeta(testIndexPath) if err != nil { t.Error(err) } @@ -52,7 +55,7 @@ func TestIndexMeta(t *testing.T) { } // save a meta that already exists - err = im.Save(testIndexPath) + err = im.Save(testIndexPath, writer) if err == nil { t.Errorf("expected error, got nil") } diff --git a/util/callbacks.go b/util/callbacks.go new file mode 100644 index 000000000..b24a76f5a --- /dev/null +++ b/util/callbacks.go @@ -0,0 +1,228 @@ +package util + +import ( + zapv16 "github.com/blevesearch/zapx/v16" +) + +// Variables used for development and testing purposes +// var keys = map[string][]byte{} +// var cbLock = sync.RWMutex{} +// var latestCallbackId string + +var WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { + return "", func(data, counter []byte) ([]byte, error) { + return data, nil + }, nil +} + +var WriterCallbackGetterWithId = func(cbId string) (func(data, counter []byte) ([]byte, error), error) { + return func(data, counter []byte) ([]byte, error) { + return data, nil + }, nil +} + +var ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { + return func(data []byte) ([]byte, error) { + return data, nil + }, nil +} + +var CounterGetter = func() ([]byte, error) { + return nil, nil +} + +func init() { + // Variables used for development and testing purposes + // encryptionKey := make([]byte, 32) + // if _, err := rand.Read(encryptionKey); err != nil { + // panic("failed to generate AES key: " + err.Error()) + // } + + // latestCallbackId = "exampleCallback" + // keys[latestCallbackId] = encryptionKey + + // latestCallbackId = "exampleCallback" + + // WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { + // cbLock.RLock() + // if latestCallbackId == "" { + // return "", func(data []byte, _ []byte) ([]byte, error) { + // return data, nil + // }, nil + // } + // keyCopy := make([]byte, 32) + // keyIdCopy := latestCallbackId + // if key, exists := keys[latestCallbackId]; exists { + // copy(keyCopy, key) + // } + // cbLock.RUnlock() + + // block, err := aes.NewCipher(keyCopy) + // if err != nil { + // return "", nil, err + // } + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // return "", nil, err + // } + + // return keyIdCopy, func(data, counter []byte) ([]byte, error) { + // ciphertext := aesgcm.Seal(nil, counter, data, nil) + // result := append(ciphertext, counter...) + // return result, nil + // }, nil + // } + + // ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { + // cbLock.RLock() + // keyCopy := make([]byte, 32) + // if key, exists := keys[cbId]; exists { + // copy(keyCopy, key) + // } + // cbLock.RUnlock() + + // if len(keyCopy) == 0 { + // return func(data []byte) ([]byte, error) { + // return data, nil + // }, nil + // } else { + // block, err := aes.NewCipher(keyCopy) + // if err != nil { + // return nil, err + // } + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // return nil, err + // } + + // return func(data []byte) ([]byte, error) { + // if len(data) < 12 { + // return nil, fmt.Errorf("ciphertext too short") + // } + + // nonce := data[len(data)-12:] + // ciphertext := data[:len(data)-12] + + // plaintext, err := aesgcm.Open(nil, nonce, ciphertext, nil) + // if err != nil { + // return nil, fmt.Errorf("decryption failed: %w", err) + // } + + // return plaintext, nil + // }, nil + // } + // } + + // CounterGetter = func() ([]byte, error) { + // nonce := make([]byte, 12) // GCM standard + // if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + // return nil, err + // } + // return nonce, nil + // } + + zapv16.WriterCallbackGetter = WriterCallbackGetter + zapv16.ReaderCallbackGetter = ReaderCallbackGetter + zapv16.CounterGetter = CounterGetter +} + +// Function used for development and testing purposes +// func SetNewCallback(callbackId string, key []byte) { +// if callbackId != "" { +// cbLock.Lock() +// keys[callbackId] = key +// latestCallbackId = callbackId +// cbLock.Unlock() +// } +// } + +type FileWriter struct { + writerCB func(data, counter []byte) ([]byte, error) + counter []byte + id string +} + +func NewFileWriter() (*FileWriter, error) { + var err error + rv := &FileWriter{} + rv.id, rv.writerCB, err = WriterCallbackGetter() + if err != nil { + return nil, err + } + rv.counter, err = CounterGetter() + if err != nil { + return nil, err + } + + return rv, nil +} + +func NewFileWriterWithId(cbId string) (*FileWriter, error) { + writerCB, err := WriterCallbackGetterWithId(cbId) + if err != nil { + return nil, err + } + + counter, err := CounterGetter() + if err != nil { + return nil, err + } + + return &FileWriter{ + writerCB: writerCB, + counter: counter, + id: cbId, + }, nil +} + +func (w *FileWriter) Process(data []byte) ([]byte, error) { + if w.writerCB != nil { + w.incrementCounter() + return w.writerCB(data, w.counter) + } + return data, nil +} + +func (w *FileWriter) incrementCounter() { + if w.counter != nil { + for i := len(w.counter) - 1; i >= 0; i-- { + if w.counter[i] < 255 { + w.counter[i]++ + return + } + w.counter[i] = 0 + } + } +} + +func (w *FileWriter) Id() string { + return w.id +} + +type FileReader struct { + readerCB func(data []byte) ([]byte, error) + id string +} + +func NewFileReader(cbId string) (*FileReader, error) { + readerCB, err := ReaderCallbackGetter(cbId) + if err != nil { + return nil, err + } + + return &FileReader{ + readerCB: readerCB, + id: cbId, + }, nil +} + +func (r *FileReader) Process(data []byte) ([]byte, error) { + if r.readerCB != nil { + return r.readerCB(data) + } + return data, nil +} + +func (r *FileReader) Id() string { + return r.id +} From e71a8f39ec5cdadea0cae32b5f9c16b0944920bb Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 23 Jul 2025 15:19:17 +0530 Subject: [PATCH 04/11] MB-65680: Removed duplicate file --- callbacks.go | 196 --------------------------------------------------- 1 file changed, 196 deletions(-) delete mode 100644 callbacks.go diff --git a/callbacks.go b/callbacks.go deleted file mode 100644 index 1bee60dfc..000000000 --- a/callbacks.go +++ /dev/null @@ -1,196 +0,0 @@ -package bleve - -import ( - zapv16 "github.com/blevesearch/zapx/v16" -) - -// Variables used for development and testing purposes -// var keys = map[string][]byte{} -// var cbLock = sync.RWMutex{} -// var latestCallbackId string - -var WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { - return "", func(data, counter []byte) ([]byte, error) { - return data, nil - }, nil -} - -var ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { - return func(data []byte) ([]byte, error) { - return data, nil - }, nil -} - -var CounterGetter = func() ([]byte, error) { - return nil, nil -} - -func init() { - // Variables used for development and testing purposes - // encryptionKey := make([]byte, 32) - // if _, err := rand.Read(encryptionKey); err != nil { - // panic("failed to generate AES key: " + err.Error()) - // } - - // latestCallbackId = "exampleCallback" - // keys[latestCallbackId] = encryptionKey - - // latestCallbackId = "exampleCallback" - - // WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { - // cbLock.RLock() - // if latestCallbackId == "" { - // return "", func(data []byte, _ []byte) ([]byte, error) { - // return data, nil - // }, nil - // } - // keyCopy := make([]byte, 32) - // keyIdCopy := latestCallbackId - // if key, exists := keys[latestCallbackId]; exists { - // copy(keyCopy, key) - // } - // cbLock.RUnlock() - - // block, err := aes.NewCipher(keyCopy) - // if err != nil { - // return "", nil, err - // } - // aesgcm, err := cipher.NewGCM(block) - // if err != nil { - // return "", nil, err - // } - - // return keyIdCopy, func(data, counter []byte) ([]byte, error) { - // ciphertext := aesgcm.Seal(nil, counter, data, nil) - // result := append(ciphertext, counter...) - // return result, nil - // }, nil - // } - - // ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { - // cbLock.RLock() - // keyCopy := make([]byte, 32) - // if key, exists := keys[cbId]; exists { - // copy(keyCopy, key) - // } - // cbLock.RUnlock() - - // if len(keyCopy) == 0 { - // return func(data []byte) ([]byte, error) { - // return data, nil - // }, nil - // } else { - // block, err := aes.NewCipher(keyCopy) - // if err != nil { - // return nil, err - // } - // aesgcm, err := cipher.NewGCM(block) - // if err != nil { - // return nil, err - // } - - // return func(data []byte) ([]byte, error) { - // if len(data) < 12 { - // return nil, fmt.Errorf("ciphertext too short") - // } - - // nonce := data[len(data)-12:] - // ciphertext := data[:len(data)-12] - - // plaintext, err := aesgcm.Open(nil, nonce, ciphertext, nil) - // if err != nil { - // return nil, fmt.Errorf("decryption failed: %w", err) - // } - - // return plaintext, nil - // }, nil - // } - // } - - // CounterGetter = func() ([]byte, error) { - // nonce := make([]byte, 12) // GCM standard - // if _, err := io.ReadFull(rand.Reader, nonce); err != nil { - // return nil, err - // } - // return nonce, nil - // } - - zapv16.WriterCallbackGetter = WriterCallbackGetter - zapv16.ReaderCallbackGetter = ReaderCallbackGetter - zapv16.CounterGetter = CounterGetter -} - -// Function used for development and testing purposes -// func SetNewCallback(callbackId string, key []byte) { -// if callbackId != "" { -// cbLock.Lock() -// keys[callbackId] = key -// latestCallbackId = callbackId -// cbLock.Unlock() -// } -// } - -type fileWriter struct { - writerCB func(data, counter []byte) ([]byte, error) - counter []byte - id string -} - -func NewFileWriter() (*fileWriter, error) { - var err error - rv := &fileWriter{} - rv.id, rv.writerCB, err = WriterCallbackGetter() - if err != nil { - return nil, err - } - rv.counter, err = CounterGetter() - if err != nil { - return nil, err - } - - return rv, nil -} - -func (w *fileWriter) Process(data []byte) ([]byte, error) { - if w.writerCB != nil { - w.incrementCounter() - return w.writerCB(data, w.counter) - } - return data, nil -} - -func (w *fileWriter) incrementCounter() { - if w.counter != nil { - for i := len(w.counter) - 1; i >= 0; i-- { - if w.counter[i] < 255 { - w.counter[i]++ - return - } - w.counter[i] = 0 - } - } -} - -type fileReader struct { - readerCB func(data []byte) ([]byte, error) - id string -} - -func NewFileReader(cbId string) (*fileReader, error) { - readerCB, err := ReaderCallbackGetter(cbId) - if err != nil { - return nil, err - } - - return &fileReader{ - readerCB: readerCB, - id: cbId, - }, nil -} - -func (r *fileReader) Process(data []byte) ([]byte, error) { - if r.readerCB != nil { - return r.readerCB(data) - } - return data, nil -} From 338ed1e9ff2a86beff4103e221fda91f8ba7943d Mon Sep 17 00:00:00 2001 From: Likith B Date: Mon, 22 Sep 2025 13:36:07 +0530 Subject: [PATCH 05/11] MB-65680: Added key management APIs and functionality --- document/document.go | 1 + index.go | 6 + index/scorch/builder.go | 2 +- index/scorch/merge.go | 5 +- index/scorch/persister.go | 195 ++++++++++++++++++++++++++++++--- index/scorch/scorch.go | 88 ++++++--------- index/scorch/snapshot_index.go | 4 +- index_impl.go | 46 +++++++- index_meta.go | 49 +++++++++ 9 files changed, 324 insertions(+), 72 deletions(-) diff --git a/document/document.go b/document/document.go index 569d57bd6..4a9777f62 100644 --- a/document/document.go +++ b/document/document.go @@ -35,6 +35,7 @@ type Document struct { CompositeFields []*CompositeField StoredFieldsSize uint64 indexed bool + subdocs []*Document } func (d *Document) StoredFieldsBytes() uint64 { diff --git a/index.go b/index.go index a9c8ada34..25342e30d 100644 --- a/index.go +++ b/index.go @@ -388,3 +388,9 @@ type SynonymIndex interface { // IndexSynonym indexes a synonym definition, with the specified id and belonging to the specified collection. IndexSynonym(id string, collection string, definition *SynonymDefinition) error } + +type CustomizableIndex interface { + Index + KeysInUse() (map[string]struct{}, error) + DropKeys(ids map[string]struct{}) error +} diff --git a/index/scorch/builder.go b/index/scorch/builder.go index bc5ec9e9d..d4d8e9c07 100644 --- a/index/scorch/builder.go +++ b/index/scorch/builder.go @@ -303,7 +303,7 @@ func (o *Builder) Close() error { } // fill the root bolt with this fake index snapshot - _, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil, nil, nil) + _, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil, nil) if err != nil { _ = tx.Rollback() _ = rootBolt.Close() diff --git a/index/scorch/merge.go b/index/scorch/merge.go index e49842107..15408a1d8 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -94,13 +94,16 @@ OUTER: if ctrlMsg.plan == nil { err = s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, ourSnapshot) - } else { + } + + if err == nil { cw := newCloseChWrapper(s.closeCh, ctrlMsg.ctx) defer cw.close() go cw.listen() err = s.executePlanMergeAtSnapshot(ctrlMsg.plan, cw) } + if err != nil { atomic.StoreUint64(&s.iStats.mergeEpoch, 0) if err == segment.ErrClosed { diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 4a8a74817..7c24b9252 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -605,13 +605,8 @@ func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory, return err } -func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, - segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory, - writer *util.FileWriter) ( - []string, map[uint64]string, error) { - if writer == nil { - writer = &util.FileWriter{} - } +func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segPlugin SegmentPlugin, + exclude map[uint64]struct{}, d index.Directory) ([]string, map[uint64]string, error) { snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket) if err != nil { return nil, nil, err @@ -637,6 +632,15 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, if err != nil { return nil, nil, err } + writer, err := util.NewFileWriter() + if err != nil { + return nil, nil, err + } + + err = metaBucket.Put(util.BoltMetaDataWriterIdKey, []byte(writer.Id())) + if err != nil { + return nil, nil, err + } // Storing the timestamp at which the current indexSnapshot // was persisted, useful when you want to spread the @@ -784,7 +788,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot, exclude map[uint } }() - filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, exclude, nil, s.writer) + filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, exclude, nil) if err != nil { return err } @@ -860,6 +864,20 @@ func zapFileName(epoch uint64) string { // bolt snapshot code +var ( + boltSnapshotsBucket = []byte{'s'} + boltPathKey = []byte{'p'} + boltDeletedKey = []byte{'d'} + boltInternalKey = []byte{'i'} + boltMetaDataKey = []byte{'m'} + boltMetaDataSegmentTypeKey = []byte("type") + boltMetaDataSegmentVersionKey = []byte("version") + boltMetaDataTimeStamp = []byte("timeStamp") + boltMetaDataWriterIdKey = []byte("writerId") + boltStatsKey = []byte("stats") + TotBytesWrittenKey = []byte("TotBytesWritten") +) + func (s *Scorch) loadFromBolt() error { err := s.rootBolt.View(func(tx *bolt.Tx) error { snapshots := tx.Bucket(util.BoltSnapshotsBucket) @@ -969,6 +987,13 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return nil, fmt.Errorf( "unable to load correct segment wrapper: %v", err) } + readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + reader, err := util.NewFileReader(readerId) + if err != nil { + _ = rv.DecRef() + return nil, fmt.Errorf("unable to load correct reader: %v", err) + } + var running uint64 c := snapshot.Cursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { @@ -979,7 +1004,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return nil, fmt.Errorf("internal bucket missing") } err := internalBucket.ForEach(func(key []byte, val []byte) error { - copiedVal, err := s.reader.Process(append([]byte(nil), val...)) + copiedVal, err := reader.Process(append([]byte(nil), val...)) if err != nil { return err } @@ -996,7 +1021,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { _ = rv.DecRef() return nil, fmt.Errorf("segment key, but bucket missing %x", k) } - segmentSnapshot, err := s.loadSegment(segmentBucket) + segmentSnapshot, err := s.loadSegment(segmentBucket, reader) if err != nil { _ = rv.DecRef() return nil, fmt.Errorf("failed to load segment: %v", err) @@ -1018,7 +1043,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return rv, nil } -func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) { +func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket, reader *util.FileReader) (*SegmentSnapshot, error) { pathBytes := segmentBucket.Get(util.BoltPathKey) if pathBytes == nil { return nil, fmt.Errorf("segment path missing") @@ -1036,7 +1061,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro } deletedBytes := segmentBucket.Get(util.BoltDeletedKey) if deletedBytes != nil { - deletedBytes, err = s.reader.Process(deletedBytes) + deletedBytes, err = reader.Process(deletedBytes) if err != nil { _ = segment.Close() return nil, err @@ -1055,7 +1080,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro statBytes := segmentBucket.Get(util.BoltStatsKey) if statBytes != nil { var statsMap map[string]map[string]uint64 - statBytes, err = s.reader.Process(statBytes) + statBytes, err = reader.Process(statBytes) if err != nil { _ = segment.Close() return nil, err @@ -1085,6 +1110,150 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return rv, nil } +func (s *Scorch) boltKeysInUse() ([]string, error) { + keyMap := make(map[string]struct{}) + err := s.rootBolt.View(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + c := snapshots.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + snapshot := snapshots.Bucket(k) + if snapshot == nil { + continue + } + metaBucket := snapshot.Bucket(boltMetaDataKey) + if metaBucket == nil { + continue + } + keyId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + keyMap[keyId] = struct{}{} + } + return nil + }) + if err != nil { + return nil, err + } + + rv := make([]string, 0, len(keyMap)) + for k := range keyMap { + rv = append(rv, k) + } + + return rv, nil +} + +func (s *Scorch) removeBoltKeys(ids []string) error { + keyMap := make(map[string]struct{}) + for _, id := range ids { + keyMap[id] = struct{}{} + } + writer, err := util.NewFileWriter() + if err != nil { + return err + } + + err = s.rootBolt.Update(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + c := snapshots.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + snapshot := snapshots.Bucket(k) + if snapshot == nil { + continue + } + metaBucket := snapshot.Bucket(boltMetaDataKey) + if metaBucket == nil { + continue + } + readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + if _, ok := keyMap[readerId]; ok { + reader, err := util.NewFileReader(readerId) + if err != nil { + return fmt.Errorf("unable to load correct reader: %v", err) + } + c := snapshots.Cursor() + for kk, _ := c.First(); kk != nil; kk, _ = c.Next() { + if k[0] == boltInternalKey[0] { + internalBucket := snapshot.Bucket(kk) + if internalBucket == nil { + continue + } + // process all of the internal values and replace them with new values + err := internalBucket.ForEach(func(key []byte, val []byte) error { + buf, err := reader.Process(val) + if err != nil { + return err + } + + newBuf, err := writer.Process(buf) + if err != nil { + return err + } + return internalBucket.Put(key, newBuf) + }) + if err != nil { + return err + } + } else if kk[0] != boltMetaDataKey[0] { + segmentBucket := snapshot.Bucket(kk) + if segmentBucket == nil { + continue + } + // process the deleted key + deletedBytes := segmentBucket.Get(boltDeletedKey) + if deletedBytes != nil { + buf, err := reader.Process(deletedBytes) + if err != nil { + return err + } + + newBuf, err := writer.Process(buf) + if err != nil { + return err + } + err = segmentBucket.Put(boltDeletedKey, newBuf) + if err != nil { + return err + } + } + // process the stats key + statsBytes := segmentBucket.Get(boltStatsKey) + if statsBytes != nil { + buf, err := reader.Process(statsBytes) + if err != nil { + return err + } + + newBuf, err := writer.Process(buf) + if err != nil { + return err + } + err = segmentBucket.Put(boltStatsKey, newBuf) + if err != nil { + return err + } + } + } + } + err = metaBucket.Put(boltMetaDataWriterIdKey, []byte(writer.Id())) + if err != nil { + return err + } + } + } + return nil + }) + + if err != nil { + return err + } + return nil +} + func (s *Scorch) removeOldData() { removed, err := s.removeOldBoltSnapshots() if err != nil { diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 99aabce26..d79b353cf 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -80,9 +80,6 @@ type Scorch struct { rootBolt *bolt.DB asyncTasks sync.WaitGroup - writer *util.FileWriter - reader *util.FileReader - onEvent func(event Event) bool onAsyncError func(err error, path string) @@ -166,23 +163,6 @@ func NewScorch(storeName string, if ok { rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName] } - writerId, ok := config["writerId"].(string) - var writer *util.FileWriter - if ok { - writer, err = util.NewFileWriterWithId(writerId) - } else { - writer, err = util.NewFileWriter() - } - if err != nil { - return nil, err - } - rv.writer = writer - - reader, err := util.NewFileReader(rv.writer.Id()) - if err != nil { - return nil, err - } - rv.reader = reader // validate any custom persistor options to // prevent an async error in the persistor routine @@ -1061,36 +1041,55 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping }) } -// Used when a callback expires or is removed -func (s *Scorch) RemoveCallback(cbId string) error { - s.rootLock.Lock() - defer s.rootLock.Unlock() +func (s *Scorch) KeysInUse() ([]string, error) { + s.rootLock.RLock() + defer s.rootLock.RUnlock() - segsToCompact := make([]mergeplan.Segment, 0) + keyMap := make(map[string]struct{}) for _, segmentSnapShot := range s.root.segment { if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { - if seg.CallbackId() == cbId { - segsToCompact = append(segsToCompact, segmentSnapShot) - } + keyMap[seg.CallbackId()] = struct{}{} } } - if len(segsToCompact) > 0 { - return s.forceMergeSegs(segsToCompact) + boltKeys, err := s.boltKeysInUse() + if err != nil { + return nil, err } - return nil + for _, k := range boltKeys { + keyMap[k] = struct{}{} + } + + rv := make([]string, 0, len(keyMap)) + for k := range keyMap { + rv = append(rv, k) + } + + return rv, nil } -// Used when all callbacks need to be removed -func (s *Scorch) RemoveAllCallbacks() error { +func (s *Scorch) DropKeys(ids []string) error { + + keyMap := make(map[string]struct{}) + for _, k := range ids { + keyMap[k] = struct{}{} + } + + err := s.removeBoltKeys(ids) + if err != nil { + return err + } + s.rootLock.Lock() defer s.rootLock.Unlock() segsToCompact := make([]mergeplan.Segment, 0) for _, segmentSnapShot := range s.root.segment { - if _, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { - segsToCompact = append(segsToCompact, segmentSnapShot) + if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { + if _, ok := keyMap[seg.CallbackId()]; ok { + segsToCompact = append(segsToCompact, segmentSnapShot) + } } } @@ -1155,22 +1154,3 @@ func (s *Scorch) forceMergeSegs(segsToCompact []mergeplan.Segment) error { return nil } - -// CBIDsInUse returns a map of all the callback IDs that are currently in use -func (s *Scorch) CBIDsInUse() map[string]struct{} { - s.rootLock.RLock() - defer s.rootLock.RUnlock() - - rv := make(map[string]struct{}) - if s.root == nil { - return rv - } - - for _, segmentSnapShot := range s.root.segment { - if seg, ok := segmentSnapShot.segment.(segment.CustomizableSegment); ok { - rv[seg.CallbackId()] = struct{}{} - } - } - - return rv -} diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 6c989dfe2..07f5de2dc 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -91,6 +91,8 @@ type IndexSnapshot struct { // UpdateFieldInfo.Index or .Store or .DocValues). // Used to short circuit queries trying to read stale data updatedFields map[string]*index.UpdateFieldInfo + + writerId string } func (i *IndexSnapshot) Segments() []*SegmentSnapshot { @@ -1018,7 +1020,7 @@ func (is *IndexSnapshot) CopyTo(d index.Directory) error { return err } - _, _, err = prepareBoltSnapshot(is, tx, "", is.parent.segPlugin, nil, d, nil) + _, _, err = prepareBoltSnapshot(is, tx, "", is.parent.segPlugin, nil, d) if err != nil { _ = tx.Rollback() return fmt.Errorf("error backing up index snapshot: %v", err) diff --git a/index_impl.go b/index_impl.go index 84026ad68..20d6a7450 100644 --- a/index_impl.go +++ b/index_impl.go @@ -105,6 +105,7 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, rv.stats = &IndexStat{i: &rv} // at this point there is hope that we can be successful, so save index meta if path != "" { + kvconfig["callback_id"] = rv.writer.Id() err = rv.meta.Save(path, rv.writer) if err != nil { return nil, err @@ -112,7 +113,6 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, kvconfig["create_if_missing"] = true kvconfig["error_if_exists"] = true kvconfig["path"] = indexStorePath(path) - kvconfig["callback_id"] = rv.writer.Id() } else { kvconfig["path"] = "" } @@ -167,6 +167,11 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde return nil, err } + rv.writer, err = util.NewFileWriterWithId(rv.reader.Id()) + if err != nil { + return nil, err + } + // backwards compatibility if index type is missing if rv.meta.IndexType == "" { rv.meta.IndexType = upsidedown.Name @@ -183,7 +188,6 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde storeConfig["path"] = indexStorePath(path) storeConfig["create_if_missing"] = false storeConfig["error_if_exists"] = false - storeConfig["callback_id"] = rv.reader.Id() for rck, rcv := range runtimeConfig { storeConfig[rck] = rcv if rck == "updated_mapping" { @@ -490,6 +494,44 @@ func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) { return i.SearchInContext(context.Background(), req) } +func (i *indexImpl) KeysInUse() (map[string]struct{}, error) { + + keys := map[string]struct{}{} + keys[i.reader.Id()] = struct{}{} + + if cidx, ok := i.i.(index.CustomizableIndex); ok { + cKeys, err := cidx.KeysInUse() + if err != nil { + return nil, err + } + for k := range cKeys { + keys[k] = struct{}{} + } + } + + return keys, nil +} + +func (i *indexImpl) DropKeys(keys map[string]struct{}) error { + + if _, ok := keys[i.reader.Id()]; ok { + err := i.meta.UpdateWriter(i.path) + if err != nil { + return err + } + } + + if cidx, ok := i.i.(index.CustomizableIndex); ok { + return cidx.DropKeys(keys) + } else { + if _, ok := keys[""]; ok { + return fmt.Errorf("underlying index does not support DropKeys") + } + } + + return nil +} + var ( documentMatchEmptySize int searchContextEmptySize int diff --git a/index_meta.go b/index_meta.go index 72bbe624a..5d8c52129 100644 --- a/index_meta.go +++ b/index_meta.go @@ -151,6 +151,55 @@ func (i *indexMeta) CopyTo(d index.Directory) (err error) { return err } +func (i *indexMeta) UpdateWriter(path string) error { + indexMetaPath := indexMetaPath(path) + metaBytes, err := os.ReadFile(indexMetaPath) + if err != nil { + return ErrorIndexMetaMissing + } + + if len(metaBytes) < 4 { + return ErrorIndexMetaCorrupt + } + + pos := len(metaBytes) - 4 + writerIdLen := int(binary.BigEndian.Uint32(metaBytes[pos:])) + pos -= writerIdLen + if pos < 0 { + return ErrorIndexMetaCorrupt + } + + writerId := metaBytes[pos : pos+writerIdLen] + fileReader, err := util.NewFileReader(string(writerId)) + if err != nil { + return err + } + + metaBytes, err = fileReader.Process(metaBytes[0:pos]) + if err != nil { + return err + } + + writer, err := util.NewFileWriter() + if err != nil { + return err + } + + metaBytes, err = writer.Process(metaBytes) + if err != nil { + return err + } + + metaBytes = append(metaBytes, []byte(writer.Id())...) + binary.BigEndian.PutUint32(metaBytes, uint32(len(writer.Id()))) + err = os.WriteFile(indexMetaPath, metaBytes, 0666) + if err != nil { + return err + } + + return nil +} + func indexMetaPath(path string) string { return filepath.Join(path, metaFilename) } From 5e984b7bcaf945582413be55a0b140d8fa2cc8f2 Mon Sep 17 00:00:00 2001 From: Likith B Date: Mon, 22 Sep 2025 13:37:24 +0530 Subject: [PATCH 06/11] Removed irrelevant code --- document/document.go | 1 - 1 file changed, 1 deletion(-) diff --git a/document/document.go b/document/document.go index 4a9777f62..569d57bd6 100644 --- a/document/document.go +++ b/document/document.go @@ -35,7 +35,6 @@ type Document struct { CompositeFields []*CompositeField StoredFieldsSize uint64 indexed bool - subdocs []*Document } func (d *Document) StoredFieldsBytes() uint64 { From 4da3da5ddeab2744ff5b6c06f508baaeef091d78 Mon Sep 17 00:00:00 2001 From: Likith B Date: Mon, 22 Sep 2025 14:53:18 +0530 Subject: [PATCH 07/11] fixed typo --- index/scorch/merge.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 15408a1d8..b6da0c572 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -94,9 +94,7 @@ OUTER: if ctrlMsg.plan == nil { err = s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, ourSnapshot) - } - - if err == nil { + } else { cw := newCloseChWrapper(s.closeCh, ctrlMsg.ctx) defer cw.close() go cw.listen() From 0580c120bb58474e7b6d0d237ee86b7ef2ce9269 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 29 Oct 2025 14:00:16 +0530 Subject: [PATCH 08/11] Minor fixes --- builder.go | 6 +- index/scorch/persister.go | 6 +- index_impl.go | 10 +-- index_meta.go | 8 +- index_meta_test.go | 5 +- util/callbacks.go | 155 +++++++++++++------------------------- 6 files changed, 75 insertions(+), 115 deletions(-) diff --git a/builder.go b/builder.go index 8ef2adc6e..a057eaa07 100644 --- a/builder.go +++ b/builder.go @@ -75,8 +75,10 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int // not the resulting index meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{}) - // CHECK where this is used - writer := &util.FileWriter{} + writer, err := util.NewFileWriter() + if err != nil { + return nil, err + } err = meta.Save(path, writer) if err != nil { return nil, err diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 7c24b9252..3e3b6ff33 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -677,7 +677,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP val := make([]byte, 8) bytesWritten := atomic.LoadUint64(&snapshot.parent.stats.TotBytesWrittenAtIndexTime) binary.LittleEndian.PutUint64(val, bytesWritten) - err = internalBucket.Put(util.TotBytesWrittenKey, val) + buf, err := writer.Process(val) + if err != nil { + return nil, nil, err + } + err = internalBucket.Put(util.TotBytesWrittenKey, buf) if err != nil { return nil, nil, err } diff --git a/index_impl.go b/index_impl.go index 20d6a7450..a2ab57a80 100644 --- a/index_impl.go +++ b/index_impl.go @@ -94,6 +94,10 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, if err != nil { return nil, err } + fileReader, err := util.NewFileReader(fileWriter.Id()) + if err != nil { + return nil, err + } rv := indexImpl{ path: path, @@ -101,6 +105,7 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, m: mapping, meta: newIndexMeta(indexType, kvstore, kvconfig), writer: fileWriter, + reader: fileReader, } rv.stats = &IndexStat{i: &rv} // at this point there is hope that we can be successful, so save index meta @@ -167,11 +172,6 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde return nil, err } - rv.writer, err = util.NewFileWriterWithId(rv.reader.Id()) - if err != nil { - return nil, err - } - // backwards compatibility if index type is missing if rv.meta.IndexType == "" { rv.meta.IndexType = upsidedown.Name diff --git a/index_meta.go b/index_meta.go index 5d8c52129..9c65b037f 100644 --- a/index_meta.go +++ b/index_meta.go @@ -71,8 +71,12 @@ func openIndexMeta(path string) (*indexMeta, *util.FileReader, error) { if err != nil { return nil, nil, err } - pos -= len(writerId) - err = util.UnmarshalJSON(metaBytes[0:pos], &im) + + buf, err := fileReader.Process(metaBytes[0:pos]) + if err != nil { + return nil, nil, err + } + err = util.UnmarshalJSON(buf, &im) if err != nil { return nil, nil, ErrorIndexMetaCorrupt } diff --git a/index_meta_test.go b/index_meta_test.go index d2fa5a805..bf3aa8873 100644 --- a/index_meta_test.go +++ b/index_meta_test.go @@ -36,7 +36,10 @@ func TestIndexMeta(t *testing.T) { t.Errorf("expected error, got nil") } - writer := &util.FileWriter{} + writer, err := util.NewFileWriter() + if err != nil { + t.Fatal(err) + } // create meta im := &indexMeta{Storage: "boltdb"} err = im.Save(testIndexPath, writer) diff --git a/util/callbacks.go b/util/callbacks.go index b24a76f5a..211739131 100644 --- a/util/callbacks.go +++ b/util/callbacks.go @@ -15,12 +15,6 @@ var WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, e }, nil } -var WriterCallbackGetterWithId = func(cbId string) (func(data, counter []byte) ([]byte, error), error) { - return func(data, counter []byte) ([]byte, error) { - return data, nil - }, nil -} - var ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { return func(data []byte) ([]byte, error) { return data, nil @@ -32,93 +26,74 @@ var CounterGetter = func() ([]byte, error) { } func init() { - // Variables used for development and testing purposes + // // Variables used for development and testing purposes // encryptionKey := make([]byte, 32) // if _, err := rand.Read(encryptionKey); err != nil { // panic("failed to generate AES key: " + err.Error()) // } - // latestCallbackId = "exampleCallback" - // keys[latestCallbackId] = encryptionKey + // key := make([]byte, 32) + // keyId := "test-key-id" - // latestCallbackId = "exampleCallback" + // if _, err := rand.Read(key); err != nil { + // panic("Failed to generate random key: " + err.Error()) + // } - // WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { - // cbLock.RLock() - // if latestCallbackId == "" { - // return "", func(data []byte, _ []byte) ([]byte, error) { - // return data, nil - // }, nil - // } - // keyCopy := make([]byte, 32) - // keyIdCopy := latestCallbackId - // if key, exists := keys[latestCallbackId]; exists { - // copy(keyCopy, key) - // } - // cbLock.RUnlock() + // block, err := aes.NewCipher(key) + // if err != nil { + // panic("Failed to create AES cipher: " + err.Error()) + // } - // block, err := aes.NewCipher(keyCopy) - // if err != nil { - // return "", nil, err - // } - // aesgcm, err := cipher.NewGCM(block) - // if err != nil { - // return "", nil, err + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // panic("Failed to create AES GCM: " + err.Error()) + // } + + // CounterGetter = func() ([]byte, error) { + // counter := make([]byte, 12) + // if _, err := rand.Read(counter); err != nil { + // return nil, err // } + // return counter, nil + // } + + // writerCallback := func(data, counter []byte) ([]byte, error) { + // ciphertext := aesgcm.Seal(nil, counter, data, nil) + // result := append(ciphertext, counter...) + + // // For testing purposes only + // result = append(append([]byte("EncStart"), result...), []byte("EncEnd")...) - // return keyIdCopy, func(data, counter []byte) ([]byte, error) { - // ciphertext := aesgcm.Seal(nil, counter, data, nil) - // result := append(ciphertext, counter...) - // return result, nil - // }, nil + // return result, nil // } - // ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { - // cbLock.RLock() - // keyCopy := make([]byte, 32) - // if key, exists := keys[cbId]; exists { - // copy(keyCopy, key) + // readerCallback := func(data []byte) ([]byte, error) { + // // For testing purposes only + // data = bytes.TrimPrefix(data, []byte("EncStart")) + // data = bytes.TrimSuffix(data, []byte("EncEnd")) + + // if len(data) < 12 { + // return nil, fmt.Errorf("ciphertext too short") // } - // cbLock.RUnlock() - - // if len(keyCopy) == 0 { - // return func(data []byte) ([]byte, error) { - // return data, nil - // }, nil - // } else { - // block, err := aes.NewCipher(keyCopy) - // if err != nil { - // return nil, err - // } - // aesgcm, err := cipher.NewGCM(block) - // if err != nil { - // return nil, err - // } - - // return func(data []byte) ([]byte, error) { - // if len(data) < 12 { - // return nil, fmt.Errorf("ciphertext too short") - // } - - // nonce := data[len(data)-12:] - // ciphertext := data[:len(data)-12] - - // plaintext, err := aesgcm.Open(nil, nonce, ciphertext, nil) - // if err != nil { - // return nil, fmt.Errorf("decryption failed: %w", err) - // } - - // return plaintext, nil - // }, nil + + // counter := data[len(data)-12:] + // ciphertext := data[:len(data)-12] + // plaintext, err := aesgcm.Open(nil, counter, ciphertext, nil) + // if err != nil { + // return nil, err // } + // return plaintext, nil // } - // CounterGetter = func() ([]byte, error) { - // nonce := make([]byte, 12) // GCM standard - // if _, err := io.ReadFull(rand.Reader, nonce); err != nil { - // return nil, err + // WriterCallbackGetter = func() (string, func(data []byte, counter []byte) ([]byte, error), error) { + // return keyId, writerCallback, nil + // } + + // ReaderCallbackGetter = func(id string) (func(data []byte) ([]byte, error), error) { + // if id != keyId { + // return nil, fmt.Errorf("unknown callback ID: %s", id) // } - // return nonce, nil + // return readerCallback, nil // } zapv16.WriterCallbackGetter = WriterCallbackGetter @@ -126,16 +101,6 @@ func init() { zapv16.CounterGetter = CounterGetter } -// Function used for development and testing purposes -// func SetNewCallback(callbackId string, key []byte) { -// if callbackId != "" { -// cbLock.Lock() -// keys[callbackId] = key -// latestCallbackId = callbackId -// cbLock.Unlock() -// } -// } - type FileWriter struct { writerCB func(data, counter []byte) ([]byte, error) counter []byte @@ -157,24 +122,6 @@ func NewFileWriter() (*FileWriter, error) { return rv, nil } -func NewFileWriterWithId(cbId string) (*FileWriter, error) { - writerCB, err := WriterCallbackGetterWithId(cbId) - if err != nil { - return nil, err - } - - counter, err := CounterGetter() - if err != nil { - return nil, err - } - - return &FileWriter{ - writerCB: writerCB, - counter: counter, - id: cbId, - }, nil -} - func (w *FileWriter) Process(data []byte) ([]byte, error) { if w.writerCB != nil { w.incrementCounter() From 9e54bce246c9ba2ca3fbc1b55a1d9669434ca783 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 29 Oct 2025 14:46:21 +0530 Subject: [PATCH 09/11] Addressing changes brought in by updated fields --- index/scorch/persister.go | 18 +++++-- index/scorch/scorch.go | 104 ++++++++++++++++++++++++++++++++++++-- util/keys.go | 1 + 3 files changed, 115 insertions(+), 8 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 3e3b6ff33..3218c9ee2 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -769,7 +769,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP if err != nil { return nil, nil, err } - err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, b) + updatedFieldsBytes, err := writer.Process(b) + if err != nil { + return nil, nil, err + } + err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, updatedFieldsBytes) if err != nil { return nil, nil, err } @@ -1067,7 +1071,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket, reader *util.FileReader if deletedBytes != nil { deletedBytes, err = reader.Process(deletedBytes) if err != nil { - _ = segment.Close() + _ = seg.Close() return nil, err } deletedBitmap := roaring.NewBitmap() @@ -1086,7 +1090,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket, reader *util.FileReader var statsMap map[string]map[string]uint64 statBytes, err = reader.Process(statBytes) if err != nil { - _ = segment.Close() + _ = seg.Close() return nil, err } err := json.Unmarshal(statBytes, &statsMap) @@ -1100,8 +1104,12 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket, reader *util.FileReader updatedFieldBytes := segmentBucket.Get(util.BoltUpdatedFieldsKey) if updatedFieldBytes != nil { var updatedFields map[string]*index.UpdateFieldInfo - - err := json.Unmarshal(updatedFieldBytes, &updatedFields) + updatedFieldBytes, err := reader.Process(updatedFieldBytes) + if err != nil { + _ = seg.Close() + return nil, err + } + err = json.Unmarshal(updatedFieldBytes, &updatedFields) if err != nil { _ = seg.Close() return nil, fmt.Errorf("error reading updated field bytes: %v", err) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index d79b353cf..5ecf26315 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -988,6 +988,27 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping continue } snapshot := snapshots.Bucket(k) + metaBucket := snapshot.Bucket(util.BoltMetaDataKey) + if metaBucket == nil { + return fmt.Errorf("meta-data bucket missing") + } + + writer, err := util.NewFileWriter() + if err != nil { + return fmt.Errorf("unable to load correct writer: %v", err) + } + + readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) + reader, err := util.NewFileReader(readerId) + if err != nil { + return fmt.Errorf("unable to load correct reader: %v", err) + } + + err = metaBucket.Put(util.BoltMetaDataWriterIdKey, []byte(writer.Id())) + if err != nil { + return err + } + cc := snapshot.Cursor() for kk, _ := cc.First(); kk != nil; kk, _ = cc.Next() { if kk[0] == util.BoltInternalKey[0] { @@ -995,19 +1016,56 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping if internalBucket == nil { return fmt.Errorf("segment key, but bucket missing %x", kk) } - err = internalBucket.Put(util.MappingInternalKey, mappingBytes) + + internalVals := make(map[string][]byte) + err := internalBucket.ForEach(func(key []byte, val []byte) error { + copiedVal, err := reader.Process(append([]byte(nil), val...)) + if err != nil { + return err + } + internalVals[string(key)] = copiedVal + return nil + }) if err != nil { return err } + + for key, val := range internalVals { + valBytes, err := writer.Process(val) + if err != nil { + return err + } + if key == string(util.MappingInternalKey) { + buf, err := writer.Process(mappingBytes) + if err != nil { + return err + } + err = internalBucket.Put([]byte(key), buf) + if err != nil { + return err + } + } else { + err = internalBucket.Put([]byte(key), valBytes) + if err != nil { + return err + } + } + } } else if kk[0] != util.BoltMetaDataKey[0] { segmentBucket := snapshot.Bucket(kk) if segmentBucket == nil { return fmt.Errorf("segment key, but bucket missing %x", kk) } + var updatedFields map[string]*index.UpdateFieldInfo updatedFieldBytes := segmentBucket.Get(util.BoltUpdatedFieldsKey) if updatedFieldBytes != nil { - err := json.Unmarshal(updatedFieldBytes, &updatedFields) + buf, err := reader.Process(updatedFieldBytes) + if err != nil { + return err + } + + err = json.Unmarshal(buf, &updatedFields) if err != nil { return fmt.Errorf("error reading updated field bytes: %v", err) } @@ -1030,10 +1088,50 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping if err != nil { return err } - err = segmentBucket.Put(util.BoltUpdatedFieldsKey, b) + buf, err := writer.Process(b) if err != nil { return err } + err = segmentBucket.Put(util.BoltUpdatedFieldsKey, buf) + if err != nil { + return err + } + + deletedBytes := segmentBucket.Get(util.BoltDeletedKey) + if deletedBytes != nil { + deletedBytes, err = reader.Process(deletedBytes) + if err != nil { + return err + } + + buf, err := writer.Process(deletedBytes) + if err != nil { + return err + } + + err = segmentBucket.Put(util.BoltDeletedKey, buf) + if err != nil { + return err + } + } + + statBytes := segmentBucket.Get(util.BoltStatsKey) + if statBytes != nil { + statBytes, err = reader.Process(statBytes) + if err != nil { + return err + } + + buf, err := writer.Process(statBytes) + if err != nil { + return err + } + + err = segmentBucket.Put(util.BoltStatsKey, buf) + if err != nil { + return err + } + } } } } diff --git a/util/keys.go b/util/keys.go index b71a7f48b..d4d47b22c 100644 --- a/util/keys.go +++ b/util/keys.go @@ -27,6 +27,7 @@ var ( BoltStatsKey = []byte("stats") BoltUpdatedFieldsKey = []byte("fields") TotBytesWrittenKey = []byte("TotBytesWritten") + BoltMetaDataWriterIdKey = []byte("writerId") MappingInternalKey = []byte("_mapping") ) From 86bb2067f7f81ba323f39c7a7886710e3e4dcc42 Mon Sep 17 00:00:00 2001 From: Likith B Date: Tue, 4 Nov 2025 15:47:44 +0530 Subject: [PATCH 10/11] Added basic encryption for Toy build --- util/callbacks.go | 144 ++++++++++++++++++++++++---------------------- 1 file changed, 75 insertions(+), 69 deletions(-) diff --git a/util/callbacks.go b/util/callbacks.go index 211739131..95097b247 100644 --- a/util/callbacks.go +++ b/util/callbacks.go @@ -1,6 +1,12 @@ package util import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "fmt" + zapv16 "github.com/blevesearch/zapx/v16" ) @@ -26,75 +32,75 @@ var CounterGetter = func() ([]byte, error) { } func init() { - // // Variables used for development and testing purposes - // encryptionKey := make([]byte, 32) - // if _, err := rand.Read(encryptionKey); err != nil { - // panic("failed to generate AES key: " + err.Error()) - // } - - // key := make([]byte, 32) - // keyId := "test-key-id" - - // if _, err := rand.Read(key); err != nil { - // panic("Failed to generate random key: " + err.Error()) - // } - - // block, err := aes.NewCipher(key) - // if err != nil { - // panic("Failed to create AES cipher: " + err.Error()) - // } - - // aesgcm, err := cipher.NewGCM(block) - // if err != nil { - // panic("Failed to create AES GCM: " + err.Error()) - // } - - // CounterGetter = func() ([]byte, error) { - // counter := make([]byte, 12) - // if _, err := rand.Read(counter); err != nil { - // return nil, err - // } - // return counter, nil - // } - - // writerCallback := func(data, counter []byte) ([]byte, error) { - // ciphertext := aesgcm.Seal(nil, counter, data, nil) - // result := append(ciphertext, counter...) - - // // For testing purposes only - // result = append(append([]byte("EncStart"), result...), []byte("EncEnd")...) - - // return result, nil - // } - - // readerCallback := func(data []byte) ([]byte, error) { - // // For testing purposes only - // data = bytes.TrimPrefix(data, []byte("EncStart")) - // data = bytes.TrimSuffix(data, []byte("EncEnd")) - - // if len(data) < 12 { - // return nil, fmt.Errorf("ciphertext too short") - // } - - // counter := data[len(data)-12:] - // ciphertext := data[:len(data)-12] - // plaintext, err := aesgcm.Open(nil, counter, ciphertext, nil) - // if err != nil { - // return nil, err - // } - // return plaintext, nil - // } - - // WriterCallbackGetter = func() (string, func(data []byte, counter []byte) ([]byte, error), error) { - // return keyId, writerCallback, nil - // } - - // ReaderCallbackGetter = func(id string) (func(data []byte) ([]byte, error), error) { - // if id != keyId { - // return nil, fmt.Errorf("unknown callback ID: %s", id) - // } - // return readerCallback, nil - // } + // Variables used for development and testing purposes + encryptionKey := make([]byte, 32) + if _, err := rand.Read(encryptionKey); err != nil { + panic("failed to generate AES key: " + err.Error()) + } + + key := make([]byte, 32) + keyId := "test-key-id" + + if _, err := rand.Read(key); err != nil { + panic("Failed to generate random key: " + err.Error()) + } + + block, err := aes.NewCipher(key) + if err != nil { + panic("Failed to create AES cipher: " + err.Error()) + } + + aesgcm, err := cipher.NewGCM(block) + if err != nil { + panic("Failed to create AES GCM: " + err.Error()) + } + + CounterGetter = func() ([]byte, error) { + counter := make([]byte, 12) + if _, err := rand.Read(counter); err != nil { + return nil, err + } + return counter, nil + } + + writerCallback := func(data, counter []byte) ([]byte, error) { + ciphertext := aesgcm.Seal(nil, counter, data, nil) + result := append(ciphertext, counter...) + + // For testing purposes only + result = append(append([]byte("EncStart"), result...), []byte("EncEnd")...) + + return result, nil + } + + readerCallback := func(data []byte) ([]byte, error) { + // For testing purposes only + data = bytes.TrimPrefix(data, []byte("EncStart")) + data = bytes.TrimSuffix(data, []byte("EncEnd")) + + if len(data) < 12 { + return nil, fmt.Errorf("ciphertext too short") + } + + counter := data[len(data)-12:] + ciphertext := data[:len(data)-12] + plaintext, err := aesgcm.Open(nil, counter, ciphertext, nil) + if err != nil { + return nil, err + } + return plaintext, nil + } + + WriterCallbackGetter = func() (string, func(data []byte, counter []byte) ([]byte, error), error) { + return keyId, writerCallback, nil + } + + ReaderCallbackGetter = func(id string) (func(data []byte) ([]byte, error), error) { + if id != keyId { + return nil, fmt.Errorf("unknown callback ID: %s", id) + } + return readerCallback, nil + } zapv16.WriterCallbackGetter = WriterCallbackGetter zapv16.ReaderCallbackGetter = ReaderCallbackGetter From e8c8761471ce5848f46b81ebceea1ebe8a3a1f94 Mon Sep 17 00:00:00 2001 From: Likith B Date: Thu, 13 Nov 2025 19:02:15 +0530 Subject: [PATCH 11/11] MB-65860 : Changed file writers and readers to include context --- builder.go | 2 +- index/scorch/persister.go | 48 ++----- index/scorch/scorch.go | 31 +---- index_impl.go | 5 +- index_meta.go | 16 +-- index_meta_test.go | 2 +- util/callbacks.go | 271 ++++++++++++++++++-------------------- util/keys.go | 2 + 8 files changed, 160 insertions(+), 217 deletions(-) diff --git a/builder.go b/builder.go index a057eaa07..ef3e45eb1 100644 --- a/builder.go +++ b/builder.go @@ -75,7 +75,7 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int // not the resulting index meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{}) - writer, err := util.NewFileWriter() + writer, err := util.NewFileWriter([]byte(metaFilename)) if err != nil { return nil, err } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 3218c9ee2..6dcd9f770 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -632,7 +632,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP if err != nil { return nil, nil, err } - writer, err := util.NewFileWriter() + writer, err := util.NewFileWriter(util.BoltWriterContext) if err != nil { return nil, nil, err } @@ -663,10 +663,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP } // TODO optimize writing these in order? for k, v := range snapshot.internal { - buf, err := writer.Process(v) - if err != nil { - return nil, nil, err - } + buf := writer.Process(v) err = internalBucket.Put([]byte(k), buf) if err != nil { return nil, nil, err @@ -677,10 +674,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP val := make([]byte, 8) bytesWritten := atomic.LoadUint64(&snapshot.parent.stats.TotBytesWrittenAtIndexTime) binary.LittleEndian.PutUint64(val, bytesWritten) - buf, err := writer.Process(val) - if err != nil { - return nil, nil, err - } + buf := writer.Process(val) err = internalBucket.Put(util.TotBytesWrittenKey, buf) if err != nil { return nil, nil, err @@ -737,10 +731,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP if err != nil { return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err) } - roaringBytes, err := writer.Process(roaringBuf.Bytes()) - if err != nil { - return nil, nil, err - } + roaringBytes := writer.Process(roaringBuf.Bytes()) err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBytes) if err != nil { return nil, nil, err @@ -753,10 +744,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP if err != nil { return nil, nil, err } - statsBytes, err := writer.Process(b) - if err != nil { - return nil, nil, err - } + statsBytes := writer.Process(b) err = snapshotSegmentBucket.Put(util.BoltStatsKey, statsBytes) if err != nil { return nil, nil, err @@ -769,10 +757,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, segP if err != nil { return nil, nil, err } - updatedFieldsBytes, err := writer.Process(b) - if err != nil { - return nil, nil, err - } + updatedFieldsBytes := writer.Process(b) err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, updatedFieldsBytes) if err != nil { return nil, nil, err @@ -996,7 +981,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { "unable to load correct segment wrapper: %v", err) } readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) - reader, err := util.NewFileReader(readerId) + reader, err := util.NewFileReader(readerId, util.BoltWriterContext) if err != nil { _ = rv.DecRef() return nil, fmt.Errorf("unable to load correct reader: %v", err) @@ -1161,7 +1146,7 @@ func (s *Scorch) removeBoltKeys(ids []string) error { for _, id := range ids { keyMap[id] = struct{}{} } - writer, err := util.NewFileWriter() + writer, err := util.NewFileWriter(util.BoltWriterContext) if err != nil { return err } @@ -1183,7 +1168,7 @@ func (s *Scorch) removeBoltKeys(ids []string) error { } readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) if _, ok := keyMap[readerId]; ok { - reader, err := util.NewFileReader(readerId) + reader, err := util.NewFileReader(readerId, util.BoltWriterContext) if err != nil { return fmt.Errorf("unable to load correct reader: %v", err) } @@ -1201,10 +1186,7 @@ func (s *Scorch) removeBoltKeys(ids []string) error { return err } - newBuf, err := writer.Process(buf) - if err != nil { - return err - } + newBuf := writer.Process(buf) return internalBucket.Put(key, newBuf) }) if err != nil { @@ -1223,10 +1205,7 @@ func (s *Scorch) removeBoltKeys(ids []string) error { return err } - newBuf, err := writer.Process(buf) - if err != nil { - return err - } + newBuf := writer.Process(buf) err = segmentBucket.Put(boltDeletedKey, newBuf) if err != nil { return err @@ -1240,10 +1219,7 @@ func (s *Scorch) removeBoltKeys(ids []string) error { return err } - newBuf, err := writer.Process(buf) - if err != nil { - return err - } + newBuf := writer.Process(buf) err = segmentBucket.Put(boltStatsKey, newBuf) if err != nil { return err diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 5ecf26315..ea0572a0b 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -993,13 +993,13 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping return fmt.Errorf("meta-data bucket missing") } - writer, err := util.NewFileWriter() + writer, err := util.NewFileWriter(util.BoltWriterContext) if err != nil { return fmt.Errorf("unable to load correct writer: %v", err) } readerId := string(metaBucket.Get(boltMetaDataWriterIdKey)) - reader, err := util.NewFileReader(readerId) + reader, err := util.NewFileReader(readerId, util.BoltWriterContext) if err != nil { return fmt.Errorf("unable to load correct reader: %v", err) } @@ -1031,15 +1031,9 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping } for key, val := range internalVals { - valBytes, err := writer.Process(val) - if err != nil { - return err - } + valBytes := writer.Process(val) if key == string(util.MappingInternalKey) { - buf, err := writer.Process(mappingBytes) - if err != nil { - return err - } + buf := writer.Process(mappingBytes) err = internalBucket.Put([]byte(key), buf) if err != nil { return err @@ -1088,10 +1082,7 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping if err != nil { return err } - buf, err := writer.Process(b) - if err != nil { - return err - } + buf := writer.Process(b) err = segmentBucket.Put(util.BoltUpdatedFieldsKey, buf) if err != nil { return err @@ -1104,11 +1095,7 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping return err } - buf, err := writer.Process(deletedBytes) - if err != nil { - return err - } - + buf := writer.Process(deletedBytes) err = segmentBucket.Put(util.BoltDeletedKey, buf) if err != nil { return err @@ -1122,11 +1109,7 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.UpdateFieldInfo, mapping return err } - buf, err := writer.Process(statBytes) - if err != nil { - return err - } - + buf := writer.Process(statBytes) err = segmentBucket.Put(util.BoltStatsKey, buf) if err != nil { return err diff --git a/index_impl.go b/index_impl.go index a2ab57a80..f145cb121 100644 --- a/index_impl.go +++ b/index_impl.go @@ -90,11 +90,11 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, return nil, fmt.Errorf("bleve not configured for file based indexing") } - fileWriter, err := util.NewFileWriter() + fileWriter, err := util.NewFileWriter([]byte(metaFilename)) if err != nil { return nil, err } - fileReader, err := util.NewFileReader(fileWriter.Id()) + fileReader, err := util.NewFileReader(fileWriter.Id(), []byte(metaFilename)) if err != nil { return nil, err } @@ -797,7 +797,6 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr if !contextScoreFusionKeyExists { setKnnHitsInCollector(knnHits, req, coll) } - if fts != nil { if is, ok := indexReader.(*scorch.IndexSnapshot); ok { diff --git a/index_meta.go b/index_meta.go index 9c65b037f..2ec068871 100644 --- a/index_meta.go +++ b/index_meta.go @@ -67,7 +67,7 @@ func openIndexMeta(path string) (*indexMeta, *util.FileReader, error) { } writerId := metaBytes[pos : pos+writerIdLen] - fileReader, err = util.NewFileReader(string(writerId)) + fileReader, err = util.NewFileReader(string(writerId), []byte(metaFilename)) if err != nil { return nil, nil, err } @@ -115,10 +115,7 @@ func (i *indexMeta) Save(path string, writer *util.FileWriter) (err error) { } }() - metaBytes, err = writer.Process(metaBytes) - if err != nil { - return err - } + metaBytes = writer.Process(metaBytes) _, err = indexMetaFile.Write(metaBytes) if err != nil { @@ -174,7 +171,7 @@ func (i *indexMeta) UpdateWriter(path string) error { } writerId := metaBytes[pos : pos+writerIdLen] - fileReader, err := util.NewFileReader(string(writerId)) + fileReader, err := util.NewFileReader(string(writerId), []byte(metaFilename)) if err != nil { return err } @@ -184,15 +181,12 @@ func (i *indexMeta) UpdateWriter(path string) error { return err } - writer, err := util.NewFileWriter() + writer, err := util.NewFileWriter([]byte(metaFilename)) if err != nil { return err } - metaBytes, err = writer.Process(metaBytes) - if err != nil { - return err - } + metaBytes = writer.Process(metaBytes) metaBytes = append(metaBytes, []byte(writer.Id())...) binary.BigEndian.PutUint32(metaBytes, uint32(len(writer.Id()))) diff --git a/index_meta_test.go b/index_meta_test.go index bf3aa8873..4f8f3f3cb 100644 --- a/index_meta_test.go +++ b/index_meta_test.go @@ -36,7 +36,7 @@ func TestIndexMeta(t *testing.T) { t.Errorf("expected error, got nil") } - writer, err := util.NewFileWriter() + writer, err := util.NewFileWriter([]byte(metaFilename)) if err != nil { t.Fatal(err) } diff --git a/util/callbacks.go b/util/callbacks.go index 95097b247..44ad4638e 100644 --- a/util/callbacks.go +++ b/util/callbacks.go @@ -1,151 +1,135 @@ package util -import ( - "bytes" - "crypto/aes" - "crypto/cipher" - "crypto/rand" - "fmt" - - zapv16 "github.com/blevesearch/zapx/v16" -) - -// Variables used for development and testing purposes -// var keys = map[string][]byte{} -// var cbLock = sync.RWMutex{} -// var latestCallbackId string - -var WriterCallbackGetter = func() (string, func(data, counter []byte) ([]byte, error), error) { - return "", func(data, counter []byte) ([]byte, error) { - return data, nil - }, nil -} - -var ReaderCallbackGetter = func(cbId string) (func(data []byte) ([]byte, error), error) { - return func(data []byte) ([]byte, error) { - return data, nil - }, nil -} +var WriterHook func(context []byte) (string, func(data []byte) []byte, error) -var CounterGetter = func() ([]byte, error) { - return nil, nil -} +var ReaderHook func(id string, context []byte) (func(data []byte) ([]byte, error), error) func init() { // Variables used for development and testing purposes - encryptionKey := make([]byte, 32) - if _, err := rand.Read(encryptionKey); err != nil { - panic("failed to generate AES key: " + err.Error()) - } - - key := make([]byte, 32) - keyId := "test-key-id" - - if _, err := rand.Read(key); err != nil { - panic("Failed to generate random key: " + err.Error()) - } - - block, err := aes.NewCipher(key) - if err != nil { - panic("Failed to create AES cipher: " + err.Error()) - } - - aesgcm, err := cipher.NewGCM(block) - if err != nil { - panic("Failed to create AES GCM: " + err.Error()) - } - - CounterGetter = func() ([]byte, error) { - counter := make([]byte, 12) - if _, err := rand.Read(counter); err != nil { - return nil, err - } - return counter, nil - } - - writerCallback := func(data, counter []byte) ([]byte, error) { - ciphertext := aesgcm.Seal(nil, counter, data, nil) - result := append(ciphertext, counter...) + // encryptionKey := make([]byte, 32) + // if _, err := rand.Read(encryptionKey); err != nil { + // panic("failed to generate AES key: " + err.Error()) + // } + + // key := make([]byte, 32) + // keyId := "test-key-id" + // label := []byte("search") + + // if _, err := rand.Read(key); err != nil { + // panic("Failed to generate random key: " + err.Error()) + // } + + // WriterHook = func(context []byte) (string, func(data []byte) []byte, error) { + + // derivedKey := make([]byte, 32) + // derivedKey, err := crypto.OpenSSLKBKDFDeriveKey(key, label, context, derivedKey, "SHA2-256", "") + // if err != nil { + // return "", nil, err + // } + + // block, err := aes.NewCipher(derivedKey) + // if err != nil { + // panic("Failed to create AES cipher: " + err.Error()) + // } + + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // panic("Failed to create AES GCM: " + err.Error()) + // } + + // nonce := make([]byte, 12) + // if _, err := rand.Read(nonce); err != nil { + // panic("Failed to generate random nonce: " + err.Error()) + // } + + // writerCallback := func(data []byte) []byte { + // ciphertext := aesgcm.Seal(nil, nonce, data, nil) + // result := append(ciphertext, nonce...) + + // for i := len(nonce) - 1; i >= 0; i-- { + // if nonce[i] < 255 { + // nonce[i]++ + // break + // } + // nonce[i] = 0 + // } + // return result + // } + + // return keyId, writerCallback, nil + // } + + // ReaderHook = func(id string, context []byte) (func(data []byte) ([]byte, error), error) { + // if id != keyId { + // return nil, fmt.Errorf("unknown callback ID: %s", id) + // } + + // derivedKey := make([]byte, 32) + // derivedKey, err := crypto.OpenSSLKBKDFDeriveKey(key, label, context, derivedKey, "SHA2-256", "") + // if err != nil { + // return nil, err + // } + + // block, err := aes.NewCipher(derivedKey) + // if err != nil { + // panic("Failed to create AES cipher: " + err.Error()) + // } + + // aesgcm, err := cipher.NewGCM(block) + // if err != nil { + // panic("Failed to create AES GCM: " + err.Error()) + // } + + // readerCallback := func(data []byte) ([]byte, error) { + + // if len(data) < 12 { + // return nil, fmt.Errorf("ciphertext too short") + // } + + // nonce := data[len(data)-12:] + // ciphertext := data[:len(data)-12] + // plaintext, err := aesgcm.Open(nil, nonce, ciphertext, nil) + // if err != nil { + // return nil, fmt.Errorf("failed to decrypt data: %w", err) + // } + + // return plaintext, nil + // } + + // return readerCallback, nil + // } + + // zapv16.WriterHook = WriterHook + // zapv16.ReaderHook = ReaderHook +} - // For testing purposes only - result = append(append([]byte("EncStart"), result...), []byte("EncEnd")...) +type FileWriter struct { + processor func(data []byte) []byte + context []byte + id string +} - return result, nil +func NewFileWriter(context []byte) (*FileWriter, error) { + rv := &FileWriter{ + context: context, } - readerCallback := func(data []byte) ([]byte, error) { - // For testing purposes only - data = bytes.TrimPrefix(data, []byte("EncStart")) - data = bytes.TrimSuffix(data, []byte("EncEnd")) - - if len(data) < 12 { - return nil, fmt.Errorf("ciphertext too short") - } - - counter := data[len(data)-12:] - ciphertext := data[:len(data)-12] - plaintext, err := aesgcm.Open(nil, counter, ciphertext, nil) + if WriterHook != nil { + var err error + rv.id, rv.processor, err = WriterHook(rv.context) if err != nil { return nil, err } - return plaintext, nil - } - - WriterCallbackGetter = func() (string, func(data []byte, counter []byte) ([]byte, error), error) { - return keyId, writerCallback, nil - } - - ReaderCallbackGetter = func(id string) (func(data []byte) ([]byte, error), error) { - if id != keyId { - return nil, fmt.Errorf("unknown callback ID: %s", id) - } - return readerCallback, nil - } - - zapv16.WriterCallbackGetter = WriterCallbackGetter - zapv16.ReaderCallbackGetter = ReaderCallbackGetter - zapv16.CounterGetter = CounterGetter -} - -type FileWriter struct { - writerCB func(data, counter []byte) ([]byte, error) - counter []byte - id string -} - -func NewFileWriter() (*FileWriter, error) { - var err error - rv := &FileWriter{} - rv.id, rv.writerCB, err = WriterCallbackGetter() - if err != nil { - return nil, err - } - rv.counter, err = CounterGetter() - if err != nil { - return nil, err } return rv, nil } -func (w *FileWriter) Process(data []byte) ([]byte, error) { - if w.writerCB != nil { - w.incrementCounter() - return w.writerCB(data, w.counter) - } - return data, nil -} - -func (w *FileWriter) incrementCounter() { - if w.counter != nil { - for i := len(w.counter) - 1; i >= 0; i-- { - if w.counter[i] < 255 { - w.counter[i]++ - return - } - w.counter[i] = 0 - } +func (w *FileWriter) Process(data []byte) []byte { + if w.processor != nil { + return w.processor(data) } + return data } func (w *FileWriter) Id() string { @@ -153,25 +137,30 @@ func (w *FileWriter) Id() string { } type FileReader struct { - readerCB func(data []byte) ([]byte, error) - id string + processor func(data []byte) ([]byte, error) + id string + context []byte } -func NewFileReader(cbId string) (*FileReader, error) { - readerCB, err := ReaderCallbackGetter(cbId) - if err != nil { - return nil, err +func NewFileReader(id string, context []byte) (*FileReader, error) { + rv := &FileReader{ + id: id, } - return &FileReader{ - readerCB: readerCB, - id: cbId, - }, nil + if ReaderHook != nil { + var err error + rv.processor, err = ReaderHook(id, context) + if err != nil { + return nil, err + } + } + + return rv, nil } func (r *FileReader) Process(data []byte) ([]byte, error) { - if r.readerCB != nil { - return r.readerCB(data) + if r.processor != nil { + return r.processor(data) } return data, nil } diff --git a/util/keys.go b/util/keys.go index d4d47b22c..cd994c970 100644 --- a/util/keys.go +++ b/util/keys.go @@ -30,4 +30,6 @@ var ( BoltMetaDataWriterIdKey = []byte("writerId") MappingInternalKey = []byte("_mapping") + + BoltWriterContext = []byte("bolt") )