Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,10 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
// Filter out fields that have been completely deleted or had their
// docvalues data deleted from both visitable fields and required fields
filterUpdatedFields := func(fields []string) []string {
// fast path: if no updatedFields just return the input
if len(is.updatedFields) == 0 {
return fields
}
filteredFields := make([]string, 0, len(fields))
for _, field := range fields {
if info, ok := is.updatedFields[field]; ok &&
Expand All @@ -826,8 +830,8 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
return filteredFields
}

fieldsFiltered := filterUpdatedFields(fields)
vFieldsFiltered := filterUpdatedFields(vFields)
fields = filterUpdatedFields(fields)
vFields = filterUpdatedFields(vFields)

var errCh chan error

Expand All @@ -836,9 +840,9 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
// if the caller happens to know we're on the same segmentIndex
// from a previous invocation
if cFields == nil {
cFields = subtractStrings(fieldsFiltered, vFieldsFiltered)
cFields = subtractStrings(fields, vFields)

if !ss.cachedDocs.hasFields(cFields) {
if len(cFields) > 0 && !ss.cachedDocs.hasFields(cFields) {
errCh = make(chan error, 1)

go func() {
Expand All @@ -851,8 +855,8 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
}
}

if ssvOk && ssv != nil && len(vFieldsFiltered) > 0 {
dvs, err = ssv.VisitDocValues(localDocNum, fieldsFiltered, visitor, dvs)
if ssvOk && ssv != nil && len(vFields) > 0 {
dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -980,17 +984,15 @@ func subtractStrings(a, b []string) []string {
return a
}

// Create a map for O(1) lookups
bMap := make(map[string]struct{}, len(b))
for _, bs := range b {
bMap[bs] = struct{}{}
}

rv := make([]string, 0, len(a))
OUTER:
for _, as := range a {
if _, exists := bMap[as]; !exists {
rv = append(rv, as)
for _, bs := range b {
if as == bs {
continue OUTER
}
}
rv = append(rv, as)
}
return rv
}
Expand Down
16 changes: 8 additions & 8 deletions index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (cfd *cachedFieldDocs) prepareField(field string, ss *SegmentSnapshot) {

type cachedDocs struct {
size uint64
m sync.Mutex // As the cache is asynchronously prepared, need a lock
m sync.RWMutex // As the cache is asynchronously prepared, need a lock
cache map[string]*cachedFieldDocs // Keyed by field
}

Expand Down Expand Up @@ -283,14 +283,14 @@ func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) e

// hasFields returns true if the cache has all the given fields
func (c *cachedDocs) hasFields(fields []string) bool {
c.m.Lock()
c.m.RLock()
for _, field := range fields {
if _, exists := c.cache[field]; !exists {
c.m.Unlock()
c.m.RUnlock()
return false // found a field not in cache
}
}
c.m.Unlock()
c.m.RUnlock()
return true
}

Expand All @@ -311,13 +311,13 @@ func (c *cachedDocs) updateSizeLOCKED() {

func (c *cachedDocs) visitDoc(localDocNum uint64,
fields []string, visitor index.DocValueVisitor) {
c.m.Lock()
c.m.RLock()

for _, field := range fields {
if cachedFieldDocs, exists := c.cache[field]; exists {
c.m.Unlock()
c.m.RUnlock()
<-cachedFieldDocs.readyCh
c.m.Lock()
c.m.RLock()

if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
for {
Expand All @@ -332,7 +332,7 @@ func (c *cachedDocs) visitDoc(localDocNum uint64,
}
}

c.m.Unlock()
c.m.RUnlock()
}

// the purpose of the cachedMeta is to simply allow the user of this type to record
Expand Down
8 changes: 8 additions & 0 deletions numeric/prefix_coded.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func MustNewPrefixCodedInt64(in int64, shift uint) PrefixCoded {
return rv
}

func MustNewPrefixCodedInt64Prealloc(in int64, shift uint, prealloc []byte) PrefixCoded {
rv, _, err := NewPrefixCodedInt64Prealloc(in, shift, prealloc)
if err != nil {
panic(err)
}
return rv
}

// Shift returns the number of bits shifted
// returns 0 if in uninitialized state
func (p PrefixCoded) Shift() (uint, error) {
Expand Down
2 changes: 2 additions & 0 deletions search/query/boolean.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (q *BooleanQuery) Searcher(ctx context.Context, i index.IndexReader, m mapp
// Compare document IDs
cmp := refDoc.IndexInternalID.Compare(d.IndexInternalID)
if cmp < 0 {
// recycle refDoc now that we do not need it
sctx.DocumentMatchPool.Put(refDoc)
// filterSearcher is behind the current document, Advance() it
refDoc, err = filterSearcher.Advance(sctx, d.IndexInternalID)
if err != nil || refDoc == nil {
Expand Down
6 changes: 6 additions & 0 deletions search/searcher/search_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (f *FilteringSearcher) Next(ctx *search.SearchContext) (*search.DocumentMat
if f.accept(ctx, next) {
return next, nil
}
// recycle this document match now, since
// we do not need it anymore
ctx.DocumentMatchPool.Put(next)
next, err = f.child.Next(ctx)
}
return nil, err
Expand All @@ -76,6 +79,9 @@ func (f *FilteringSearcher) Advance(ctx *search.SearchContext, ID index.IndexInt
if f.accept(ctx, adv) {
return adv, nil
}
// recycle this document match now, since
// we do not need it anymore
ctx.DocumentMatchPool.Put(adv)
return f.Next(ctx)
}

Expand Down
47 changes: 27 additions & 20 deletions search/searcher/search_geoboundingbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewGeoBoundingBoxSearcher(ctx context.Context, indexReader index.IndexReade
}

return NewFilteringSearcher(ctx, boxSearcher, buildRectFilter(ctx, dvReader,
field, minLon, minLat, maxLon, maxLat)), nil
minLon, minLat, maxLon, maxLat)), nil
}
}

Expand Down Expand Up @@ -88,7 +88,7 @@ func NewGeoBoundingBoxSearcher(ctx context.Context, indexReader index.IndexReade
}
// add filter to check points near the boundary
onBoundarySearcher = NewFilteringSearcher(ctx, rawOnBoundarySearcher,
buildRectFilter(ctx, dvReader, field, minLon, minLat, maxLon, maxLat))
buildRectFilter(ctx, dvReader, minLon, minLat, maxLon, maxLat))
openedSearchers = append(openedSearchers, onBoundarySearcher)
}

Expand Down Expand Up @@ -205,28 +205,35 @@ func buildIsIndexedFunc(ctx context.Context, indexReader index.IndexReader, fiel
return isIndexed, closeF, err
}

func buildRectFilter(ctx context.Context, dvReader index.DocValueReader, field string,
func buildRectFilter(ctx context.Context, dvReader index.DocValueReader,
minLon, minLat, maxLon, maxLat float64,
) FilterFunc {
// reuse the following for each document match that is checked using the filter
var lons, lats []float64
var found bool
dvVisitor := func(_ string, term []byte) {
if found {
// avoid redundant work if already found
return
}
// only consider the values which are shifted 0
prefixCoded := numeric.PrefixCoded(term)
shift, err := prefixCoded.Shift()
if err == nil && shift == 0 {
var i64 int64
i64, err = prefixCoded.Int64()
if err == nil {
lons = append(lons, geo.MortonUnhashLon(uint64(i64)))
lats = append(lats, geo.MortonUnhashLat(uint64(i64)))
found = true
}
}
}
return func(sctx *search.SearchContext, d *search.DocumentMatch) bool {
// check geo matches against all numeric type terms indexed
var lons, lats []float64
var found bool
err := dvReader.VisitDocValues(d.IndexInternalID, func(field string, term []byte) {
// only consider the values which are shifted 0
prefixCoded := numeric.PrefixCoded(term)
shift, err := prefixCoded.Shift()
if err == nil && shift == 0 {
var i64 int64
i64, err = prefixCoded.Int64()
if err == nil {
lons = append(lons, geo.MortonUnhashLon(uint64(i64)))
lats = append(lats, geo.MortonUnhashLat(uint64(i64)))
found = true
}
}
})
if err == nil && found {
lons, lats = lons[:0], lats[:0]
found = false
if err := dvReader.VisitDocValues(d.IndexInternalID, dvVisitor); err == nil && found {
bytes := dvReader.BytesRead()
if bytes > 0 {
reportIOStats(ctx, bytes)
Expand Down
44 changes: 25 additions & 19 deletions search/searcher/search_geopointdistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewGeoPointDistanceSearcher(ctx context.Context, indexReader index.IndexRea

// wrap it in a filtering searcher which checks the actual distance
return NewFilteringSearcher(ctx, rectSearcher,
buildDistFilter(ctx, dvReader, field, centerLon, centerLat, dist)), nil
buildDistFilter(ctx, dvReader, centerLon, centerLat, dist)), nil
}

// boxSearcher builds a searcher for the described bounding box
Expand Down Expand Up @@ -113,27 +113,33 @@ func boxSearcher(ctx context.Context, indexReader index.IndexReader,
return boxSearcher, nil
}

func buildDistFilter(ctx context.Context, dvReader index.DocValueReader, field string,
func buildDistFilter(ctx context.Context, dvReader index.DocValueReader,
centerLon, centerLat, maxDist float64) FilterFunc {
// reuse the following for each document match that is checked using the filter
var lons, lats []float64
var found bool
dvVisitor := func(_ string, term []byte) {
if found {
// avoid redundant work if already found
return
}
// only consider the values which are shifted 0
prefixCoded := numeric.PrefixCoded(term)
shift, err := prefixCoded.Shift()
if err == nil && shift == 0 {
i64, err := prefixCoded.Int64()
if err == nil {
lons = append(lons, geo.MortonUnhashLon(uint64(i64)))
lats = append(lats, geo.MortonUnhashLat(uint64(i64)))
found = true
}
}
}
return func(sctx *search.SearchContext, d *search.DocumentMatch) bool {
// check geo matches against all numeric type terms indexed
var lons, lats []float64
var found bool

err := dvReader.VisitDocValues(d.IndexInternalID, func(field string, term []byte) {
// only consider the values which are shifted 0
prefixCoded := numeric.PrefixCoded(term)
shift, err := prefixCoded.Shift()
if err == nil && shift == 0 {
i64, err := prefixCoded.Int64()
if err == nil {
lons = append(lons, geo.MortonUnhashLon(uint64(i64)))
lats = append(lats, geo.MortonUnhashLat(uint64(i64)))
found = true
}
}
})
if err == nil && found {
lons, lats = lons[:0], lats[:0]
found = false
if err := dvReader.VisitDocValues(d.IndexInternalID, dvVisitor); err == nil && found {
bytes := dvReader.BytesRead()
if bytes > 0 {
reportIOStats(ctx, bytes)
Expand Down
49 changes: 27 additions & 22 deletions search/searcher/search_geopolygon.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,37 @@ func almostEqual(a, b float64) bool {
// here: https://wrf.ecse.rpi.edu/nikola/pubdetails/pnpoly.html
func buildPolygonFilter(ctx context.Context, dvReader index.DocValueReader, field string,
coordinates []geo.Point) FilterFunc {
// reuse the following for each document match that is checked using the filter
var lons, lats []float64
var found bool
dvVisitor := func(_ string, term []byte) {
if found {
// avoid redundant work if already found
return
}
// only consider the values which are shifted 0
prefixCoded := numeric.PrefixCoded(term)
shift, err := prefixCoded.Shift()
if err == nil && shift == 0 {
i64, err := prefixCoded.Int64()
if err == nil {
lons = append(lons, geo.MortonUnhashLon(uint64(i64)))
lats = append(lats, geo.MortonUnhashLat(uint64(i64)))
found = true
}
}
}
rayIntersectsSegment := func(point, a, b geo.Point) bool {
return (a.Lat > point.Lat) != (b.Lat > point.Lat) &&
point.Lon < (b.Lon-a.Lon)*(point.Lat-a.Lat)/(b.Lat-a.Lat)+a.Lon
}
return func(sctx *search.SearchContext, d *search.DocumentMatch) bool {
// check geo matches against all numeric type terms indexed
var lons, lats []float64
var found bool

err := dvReader.VisitDocValues(d.IndexInternalID, func(field string, term []byte) {
// only consider the values which are shifted 0
prefixCoded := numeric.PrefixCoded(term)
shift, err := prefixCoded.Shift()
if err == nil && shift == 0 {
i64, err := prefixCoded.Int64()
if err == nil {
lons = append(lons, geo.MortonUnhashLon(uint64(i64)))
lats = append(lats, geo.MortonUnhashLat(uint64(i64)))
found = true
}
}
})

lons, lats = lons[:0], lats[:0]
found = false
// Note: this approach works for points which are strictly inside
// the polygon. ie it might fail for certain points on the polygon boundaries.
if err == nil && found {
if err := dvReader.VisitDocValues(d.IndexInternalID, dvVisitor); err == nil && found {
bytes := dvReader.BytesRead()
if bytes > 0 {
reportIOStats(ctx, bytes)
Expand All @@ -116,10 +125,6 @@ func buildPolygonFilter(ctx context.Context, dvReader index.DocValueReader, fiel
if len(coordinates) < 3 {
return false
}
rayIntersectsSegment := func(point, a, b geo.Point) bool {
return (a.Lat > point.Lat) != (b.Lat > point.Lat) &&
point.Lon < (b.Lon-a.Lon)*(point.Lat-a.Lat)/(b.Lat-a.Lat)+a.Lon
}

for i := range lons {
pt := geo.Point{Lon: lons[i], Lat: lats[i]}
Expand Down
Loading
Loading