diff --git a/faiss_vector_gpu.go b/faiss_vector_gpu.go new file mode 100644 index 00000000..b4317343 --- /dev/null +++ b/faiss_vector_gpu.go @@ -0,0 +1,207 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build vectors +// +build vectors + +package zap + +import ( + "math" + "math/rand/v2" + "sync" + + faiss "github.com/blevesearch/go-faiss" +) + +var ( + // NumGPUs is the number of available GPU devices + NumGPUs int + // GPULocks is a slice of mutexes for synchronizing access to GPU resources + // Primarily used for synchronizing calls to TransferToGPU and TransferToCPU + GPULocks []*sync.Mutex +) + +func init() { + n, err := faiss.NumGPUs() + if err != nil { + NumGPUs = 0 + return + } + NumGPUs = n + GPULocks = make([]*sync.Mutex, NumGPUs) + for i := 0; i < NumGPUs; i++ { + GPULocks[i] = &sync.Mutex{} + } +} + +const ( + // GPUIndexMinVectorsForTransfer is the minimum number of vectors + // required to consider transferring an IVF index to GPU for training + // Smaller indexes may not benefit from GPU acceleration + // due to transfer overheads + GPUIndexMinVectorsForTransfer = 30000 + + // GPUTransferOverheadFactor accounts for additional + // memory overhead involved during GPU index transfer, + // such as data transfer costs and temporary allocations + GPUTransferOverheadFactor = 1.4 + + // SoftmaxTemperature controls the sharpness of the probability distribution + // when selecting a GPU based on available free memory. + // Lower values (<1) make the selection more deterministic (favoring the GPU + // with the most free memory), while higher values (>1) make it more random + SoftmaxTemperature = 0.5 +) + +type gpuInfo struct { + id int + freeMem float64 // in bytes +} + +// GetDeviceID returns a device ID between 0 and NumGPUs-1 (inclusive) to be used for distributing work across multiple GPUs. +// It returns -1 if no GPUs are available or an error occurs. The selection algorithm favors GPUs with more free memory, +// using a softmax-based probabilistic selection to help spread load across multiple GPUs when more than two GPUs are available. +func GetDeviceID() int { + // simple cases + // no GPUs available, return -1 + if NumGPUs == 0 { + return -1 + } + + var gpus []*gpuInfo + for i := 0; i < NumGPUs; i++ { + freeMem, err := faiss.FreeMemory(i) + if err != nil { + continue + } + gpus = append(gpus, &gpuInfo{id: i, freeMem: float64(freeMem)}) + } + + if len(gpus) == 0 { + // fallback if no memory info available + // return -1 to indicate no GPUs available + // even though NumGPUs > 0 as we couldn't + // get memory info and thus cannot provide a + // reliable device selection + return -1 + } + + var maxGPU *gpuInfo + for _, g := range gpus { + if maxGPU == nil || g.freeMem > maxGPU.freeMem { + maxGPU = g + } + } + + // if all the GPUs are full, return -1 + if maxGPU.freeMem == 0 { + return -1 + } + + // if only two or fewer GPUs, just pick the one with the most free memory + if len(gpus) <= 2 { + return maxGPU.id + } + + // more than two GPUs, do softmax weighting based on free memory + // to probabilistically pick a GPU, favoring those with more free memory + // this helps spread load more evenly across multiple GPUs, while still + // favoring those with more available resources, mainly to avoid + // always picking the same GPU when multiple GPUs have similar free memory + expVals := make([]float64, len(gpus)) + var sumExp float64 + for i, g := range gpus { + val := math.Exp((g.freeMem - maxGPU.freeMem) / (SoftmaxTemperature * maxGPU.freeMem)) + expVals[i] = val + sumExp += val + } + + // Compute cumulative distribution and sample. + r := rand.Float64() + cumProb := 0.0 + for i, g := range gpus { + cumProb += expVals[i] / sumExp + if r <= cumProb { + return g.id + } + } + + // Fallback to max GPU + return maxGPU.id +} + +// TrainIndex trains the given FAISS index using the provided vectors and dimensions. +// If useGPU is true and a suitable GPU is available, training is performed on the GPU; +// otherwise, training falls back to the CPU. The function returns the trained index, +// which may be a new instance if GPU training and transfer back to CPU succeed. +func TrainIndex(index *faiss.IndexImpl, vecs []float32, dims int, useGPU bool) (*faiss.IndexImpl, error) { + // function to train index on CPU used as fallback on GPU failures + TrainCPU := func() (*faiss.IndexImpl, error) { + err := index.Train(vecs) + return index, err + } + // decide whether to use GPU for training + if !useGPU || NumGPUs == 0 || len(vecs)/dims < GPUIndexMinVectorsForTransfer { + // use CPU training + return TrainCPU() + } + // attempt GPU training + deviceID := GetDeviceID() + if deviceID == -1 { + // no GPUs available, fallback to CPU training + return TrainCPU() + } + // lock the selected GPU for the duration of the transfer + GPULocks[deviceID].Lock() + // check if enough free memory is available + estimatedMemNeeded := uint64(float64(len(vecs)*SizeOfFloat32) * GPUTransferOverheadFactor) // input vectors + overhead + freeMem, err := faiss.FreeMemory(deviceID) + if err != nil || freeMem < estimatedMemNeeded { + // unable to get free memory info or not enough free memory, + // fallback to CPU training + GPULocks[deviceID].Unlock() + return TrainCPU() + } + // transfer index to GPU + gpuIndex, err := faiss.TransferToGPU(index, deviceID) + if err != nil { + // transfer failed, fallback to CPU training + GPULocks[deviceID].Unlock() + return TrainCPU() + } + GPULocks[deviceID].Unlock() + // train on GPU + err = gpuIndex.Train(vecs) + if err != nil { + // training failed, fallback to CPU training + gpuIndex.Close() + return TrainCPU() + } + // acquire lock again for transfer back to CPU + GPULocks[deviceID].Lock() + // transfer back to CPU + cpuIndex, err := faiss.TransferToCPU(gpuIndex) + gpuIndex.Close() + if err != nil { + GPULocks[deviceID].Unlock() + // transfer back failed, fallback to CPU training + return TrainCPU() + } + GPULocks[deviceID].Unlock() + // successful GPU training and transfer back to CPU + // now free the original index and return the new trained index + index.Close() + return cpuIndex, nil +} diff --git a/faiss_vector_gpu_test.go b/faiss_vector_gpu_test.go new file mode 100644 index 00000000..b2858404 --- /dev/null +++ b/faiss_vector_gpu_test.go @@ -0,0 +1,42 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build vectors +// +build vectors + +package zap + +import ( + "testing" + + "github.com/blevesearch/go-faiss" +) + +func TestNumGPUs(t *testing.T) { + _, err := faiss.NumGPUs() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestGetDeviceID(t *testing.T) { + id := GetDeviceID() + if NumGPUs == 0 { + if id != -1 { + t.Fatalf("expected -1 device ID when no GPUs available, got: %d", id) + } + } else if id < 0 || id >= NumGPUs { + t.Fatalf("expected device ID between 0 and %d, got: %d", NumGPUs-1, id) + } +} diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 3a6e8fc0..c095ca7e 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -174,7 +174,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if err != nil { return err } - err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, w, closeCh) + err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, fieldName, w, closeCh) if err != nil { return err } @@ -273,7 +273,7 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 { // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, - vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error { + vecIndexes []*vecIndexInfo, fieldName string, w *CountHashWriter, closeCh chan struct{}) error { // safe to assume that all the indexes are of the same config values, given // that they are extracted from the field mapping info. @@ -324,6 +324,9 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, return nil } + // assign gpu value from the field options + gpu := v.fieldsOptions[fieldName].UseGPU() + finalVecIDs := make([]int64, 0, finalVecIDCap) // merging of indexes with reconstruction method. // the indexes[i].vecIds has only the valid vecs of this vector @@ -380,17 +383,13 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, if err != nil { return err } - defer faissIndex.Close() - - if indexClass == IndexTypeIVF { - // the direct map maintained in the IVF index is essential for the - // reconstruction of vectors based on vector IDs in the future merges. - // the AddWithIDs API also needs a direct map to be set before using. - err = faissIndex.SetDirectMap(2) - if err != nil { - return err + defer func() { + if faissIndex != nil { + faissIndex.Close() } + }() + if indexClass == IndexTypeIVF { nprobe := calculateNprobe(nlist, indexOptimizedFor) faissIndex.SetNProbe(nprobe) @@ -398,7 +397,19 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // the data space of indexData such that during the search time, we probe // only a subset of vectors -> non-exhaustive search. could be a time // consuming step when the indexData is large. - err = faissIndex.Train(indexData) + // Best effort attempt to use GPU for training if configured + // May fall back to CPU training if GPU training fails due + // to any reason such as insufficient memory, or unavailability + // of GPUs, etc. + faissIndex, err = TrainIndex(faissIndex, indexData, dims, gpu) + if err != nil { + return err + } + + // the direct map maintained in the IVF index is essential for the + // reconstruction of vectors based on vector IDs in the future merges. + // the AddWithIDs API also needs a direct map to be set before using. + err = faissIndex.SetDirectMap(2) if err != nil { return err } @@ -488,7 +499,8 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint for fieldID, content := range vo.vecFieldMap { // calculate the capacity of the vecs and ids slices // to avoid multiple allocations. - vecs := make([]float32, 0, len(content.vecs)*int(content.dim)) + dims := int(content.dim) + vecs := make([]float32, 0, len(content.vecs)*dims) ids := make([]int64, 0, len(content.vecs)) for hash, vecInfo := range content.vecs { vecs = append(vecs, vecInfo.vec...) @@ -501,6 +513,10 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // use the same FAISS metric for inner product and cosine similarity metric = faiss.MetricInnerProduct } + // read vector index options: + // was this field configured + // to use GPU for training? + gpu := vo.fieldsOptions[content.name].UseGPU() nvecs := len(ids) nlist := determineCentroids(nvecs) @@ -511,18 +527,25 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint return 0, err } - defer faissIndex.Close() + defer func() { + if faissIndex != nil { + faissIndex.Close() + } + }() if indexClass == IndexTypeIVF { - err = faissIndex.SetDirectMap(2) + nprobe := calculateNprobe(nlist, content.indexOptimizedFor) + faissIndex.SetNProbe(nprobe) + // Best effort attempt to use GPU for training if configured + // May fall back to CPU training if GPU training fails due + // to any reason such as insufficient memory, or unavailability + // of GPUs, etc. + faissIndex, err = TrainIndex(faissIndex, vecs, dims, gpu) if err != nil { return 0, err } - nprobe := calculateNprobe(nlist, content.indexOptimizedFor) - faissIndex.SetNProbe(nprobe) - - err = faissIndex.Train(vecs) + err = faissIndex.SetDirectMap(2) if err != nil { return 0, err } @@ -621,6 +644,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do } //process field + name := field.Name() vec := field.Vector() dim := field.Dims() metric := field.Similarity() @@ -652,6 +676,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do dim: uint16(dim), metric: metric, indexOptimizedFor: indexOptimizedFor, + name: name, } } else { vo.vecFieldMap[fieldID].vecs[subVecHash] = &vecInfo{ @@ -702,6 +727,7 @@ func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) reseta } type indexContent struct { + name string vecs map[int64]*vecInfo dim uint16 metric string