Skip to content
7 changes: 6 additions & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int
// do not use real config, as these are options for the builder,
// not the resulting index
meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{})
err = meta.Save(path)

writer, err := util.NewFileWriter([]byte(metaFilename))
if err != nil {
return nil, err
}
err = meta.Save(path, writer)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,9 @@ type SynonymIndex interface {
// IndexSynonym indexes a synonym definition, with the specified id and belonging to the specified collection.
IndexSynonym(id string, collection string, definition *SynonymDefinition) error
}

type CustomizableIndex interface {
Index
KeysInUse() (map[string]struct{}, error)
DropKeys(ids map[string]struct{}) error
}
27 changes: 20 additions & 7 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,19 @@ OUTER:
}

startTime := time.Now()

var err error
// lets get started
err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options,
ourSnapshot)
if ctrlMsg.plan == nil {
err = s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options,
ourSnapshot)
} else {
cw := newCloseChWrapper(s.closeCh, ctrlMsg.ctx)
defer cw.close()
go cw.listen()

err = s.executePlanMergeAtSnapshot(ctrlMsg.plan, cw)
}

if err != nil {
atomic.StoreUint64(&s.iStats.mergeEpoch, 0)
if err == segment.ErrClosed {
Expand Down Expand Up @@ -161,6 +170,7 @@ OUTER:
type mergerCtrl struct {
ctx context.Context
options *mergeplan.MergePlanOptions
plan *mergeplan.MergePlan
doneCh chan struct{}
}

Expand Down Expand Up @@ -301,15 +311,18 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,

atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks)))

// process tasks in serial for now
var filenames []string

cw := newCloseChWrapper(s.closeCh, ctx)
defer cw.close()

go cw.listen()

for _, task := range resultMergePlan.Tasks {
return s.executePlanMergeAtSnapshot(resultMergePlan, cw)
}

func (s *Scorch) executePlanMergeAtSnapshot(plan *mergeplan.MergePlan, cw *closeChWrapper) error {
var filenames []string

for _, task := range plan.Tasks {
if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
continue
Expand Down
Loading