Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions faiss_vector_gpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright (c) 2025 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build vectors
// +build vectors

package zap

import (
"math"
"math/rand/v2"
"sync"

faiss "github.com/blevesearch/go-faiss"
)

var (
// NumGPUs is the number of available GPU devices
NumGPUs int
// GPULocks is a slice of mutexes for synchronizing access to GPU resources
// Primarily used for synchronizing calls to TransferToGPU and TransferToCPU
GPULocks []*sync.Mutex
)

func init() {
n, err := faiss.NumGPUs()
if err != nil {
NumGPUs = 0
return
}
NumGPUs = n
GPULocks = make([]*sync.Mutex, NumGPUs)
for i := 0; i < NumGPUs; i++ {
GPULocks[i] = &sync.Mutex{}
}
}

const (
// GPUIndexMinVectorsForTransfer is the minimum number of vectors
// required to consider transferring an IVF index to GPU for training
// Smaller indexes may not benefit from GPU acceleration
// due to transfer overheads
GPUIndexMinVectorsForTransfer = 30000

// GPUTransferOverheadFactor accounts for additional
// memory overhead involved during GPU index transfer,
// such as data transfer costs and temporary allocations
GPUTransferOverheadFactor = 1.4

// SoftmaxTemperature controls the sharpness of the probability distribution
// when selecting a GPU based on available free memory.
// Lower values (<1) make the selection more deterministic (favoring the GPU
// with the most free memory), while higher values (>1) make it more random
SoftmaxTemperature = 0.5
)

type gpuInfo struct {
id int
freeMem float64 // in bytes
}

// GetDeviceID returns a device ID between 0 and NumGPUs-1 (inclusive) to be used for distributing work across multiple GPUs.
// It returns -1 if no GPUs are available or an error occurs. The selection algorithm favors GPUs with more free memory,
// using a softmax-based probabilistic selection to help spread load across multiple GPUs when more than two GPUs are available.
func GetDeviceID() int {
// simple cases
// no GPUs available, return -1
if NumGPUs == 0 {
return -1
}

var gpus []*gpuInfo
for i := 0; i < NumGPUs; i++ {
freeMem, err := faiss.FreeMemory(i)
if err != nil {
continue
}
gpus = append(gpus, &gpuInfo{id: i, freeMem: float64(freeMem)})
}

if len(gpus) == 0 {
// fallback if no memory info available
// return -1 to indicate no GPUs available
// even though NumGPUs > 0 as we couldn't
// get memory info and thus cannot provide a
// reliable device selection
return -1
}

var maxGPU *gpuInfo
for _, g := range gpus {
if maxGPU == nil || g.freeMem > maxGPU.freeMem {
maxGPU = g
}
}

// if all the GPUs are full, return -1
if maxGPU.freeMem == 0 {
return -1
}

// if only two or fewer GPUs, just pick the one with the most free memory
if len(gpus) <= 2 {
return maxGPU.id
}

// more than two GPUs, do softmax weighting based on free memory
// to probabilistically pick a GPU, favoring those with more free memory
// this helps spread load more evenly across multiple GPUs, while still
// favoring those with more available resources, mainly to avoid
// always picking the same GPU when multiple GPUs have similar free memory
expVals := make([]float64, len(gpus))
var sumExp float64
for i, g := range gpus {
val := math.Exp((g.freeMem - maxGPU.freeMem) / (SoftmaxTemperature * maxGPU.freeMem))
expVals[i] = val
sumExp += val
}

// Compute cumulative distribution and sample.
r := rand.Float64()
cumProb := 0.0
for i, g := range gpus {
cumProb += expVals[i] / sumExp
if r <= cumProb {
return g.id
}
}

// Fallback to max GPU
return maxGPU.id
}

// TrainIndex trains the given FAISS index using the provided vectors and dimensions.
// If useGPU is true and a suitable GPU is available, training is performed on the GPU;
// otherwise, training falls back to the CPU. The function returns the trained index,
// which may be a new instance if GPU training and transfer back to CPU succeed.
func TrainIndex(index *faiss.IndexImpl, vecs []float32, dims int, useGPU bool) (*faiss.IndexImpl, error) {
// function to train index on CPU used as fallback on GPU failures
TrainCPU := func() (*faiss.IndexImpl, error) {
err := index.Train(vecs)
return index, err
}
// decide whether to use GPU for training
if !useGPU || NumGPUs == 0 || len(vecs)/dims < GPUIndexMinVectorsForTransfer {
// use CPU training
return TrainCPU()
}
// attempt GPU training
deviceID := GetDeviceID()
if deviceID == -1 {
// no GPUs available, fallback to CPU training
return TrainCPU()
}
// lock the selected GPU for the duration of the transfer
GPULocks[deviceID].Lock()
// check if enough free memory is available
estimatedMemNeeded := uint64(float64(len(vecs)*SizeOfFloat32) * GPUTransferOverheadFactor) // input vectors + overhead
freeMem, err := faiss.FreeMemory(deviceID)
if err != nil || freeMem < estimatedMemNeeded {
// unable to get free memory info or not enough free memory,
// fallback to CPU training
GPULocks[deviceID].Unlock()
return TrainCPU()
}
// transfer index to GPU
gpuIndex, err := faiss.TransferToGPU(index, deviceID)
if err != nil {
// transfer failed, fallback to CPU training
GPULocks[deviceID].Unlock()
return TrainCPU()
}
GPULocks[deviceID].Unlock()
// train on GPU
err = gpuIndex.Train(vecs)
if err != nil {
// training failed, fallback to CPU training
gpuIndex.Close()
return TrainCPU()
}
// acquire lock again for transfer back to CPU
GPULocks[deviceID].Lock()
// transfer back to CPU
cpuIndex, err := faiss.TransferToCPU(gpuIndex)
gpuIndex.Close()
if err != nil {
GPULocks[deviceID].Unlock()
// transfer back failed, fallback to CPU training
return TrainCPU()
}
GPULocks[deviceID].Unlock()
// successful GPU training and transfer back to CPU
// now free the original index and return the new trained index
index.Close()
return cpuIndex, nil
}
42 changes: 42 additions & 0 deletions faiss_vector_gpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2025 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build vectors
// +build vectors

package zap

import (
"testing"

"github.com/blevesearch/go-faiss"
)

func TestNumGPUs(t *testing.T) {
_, err := faiss.NumGPUs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestGetDeviceID(t *testing.T) {
id := GetDeviceID()
if NumGPUs == 0 {
if id != -1 {
t.Fatalf("expected -1 device ID when no GPUs available, got: %d", id)
}
} else if id < 0 || id >= NumGPUs {
t.Fatalf("expected device ID between 0 and %d, got: %d", NumGPUs-1, id)
}
}
64 changes: 45 additions & 19 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
if err != nil {
return err
}
err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, w, closeCh)
err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, fieldName, w, closeCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 {
// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error {
vecIndexes []*vecIndexInfo, fieldName string, w *CountHashWriter, closeCh chan struct{}) error {

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
Expand Down Expand Up @@ -324,6 +324,9 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
return nil
}

// assign gpu value from the field options
gpu := v.fieldsOptions[fieldName].UseGPU()

finalVecIDs := make([]int64, 0, finalVecIDCap)
// merging of indexes with reconstruction method.
// the indexes[i].vecIds has only the valid vecs of this vector
Expand Down Expand Up @@ -380,25 +383,33 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
if err != nil {
return err
}
defer faissIndex.Close()

if indexClass == IndexTypeIVF {
// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = faissIndex.SetDirectMap(2)
if err != nil {
return err
defer func() {
if faissIndex != nil {
faissIndex.Close()
}
}()

if indexClass == IndexTypeIVF {
nprobe := calculateNprobe(nlist, indexOptimizedFor)
faissIndex.SetNProbe(nprobe)

// train the vector index, essentially performs k-means clustering to partition
// the data space of indexData such that during the search time, we probe
// only a subset of vectors -> non-exhaustive search. could be a time
// consuming step when the indexData is large.
err = faissIndex.Train(indexData)
// Best effort attempt to use GPU for training if configured
// May fall back to CPU training if GPU training fails due
// to any reason such as insufficient memory, or unavailability
// of GPUs, etc.
faissIndex, err = TrainIndex(faissIndex, indexData, dims, gpu)
if err != nil {
return err
}

// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = faissIndex.SetDirectMap(2)
if err != nil {
return err
}
Expand Down Expand Up @@ -488,7 +499,8 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
for fieldID, content := range vo.vecFieldMap {
// calculate the capacity of the vecs and ids slices
// to avoid multiple allocations.
vecs := make([]float32, 0, len(content.vecs)*int(content.dim))
dims := int(content.dim)
vecs := make([]float32, 0, len(content.vecs)*dims)
ids := make([]int64, 0, len(content.vecs))
for hash, vecInfo := range content.vecs {
vecs = append(vecs, vecInfo.vec...)
Expand All @@ -501,6 +513,10 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
// use the same FAISS metric for inner product and cosine similarity
metric = faiss.MetricInnerProduct
}
// read vector index options:
// was this field configured
// to use GPU for training?
gpu := vo.fieldsOptions[content.name].UseGPU()

nvecs := len(ids)
nlist := determineCentroids(nvecs)
Expand All @@ -511,18 +527,25 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
return 0, err
}

defer faissIndex.Close()
defer func() {
if faissIndex != nil {
faissIndex.Close()
}
}()

if indexClass == IndexTypeIVF {
err = faissIndex.SetDirectMap(2)
nprobe := calculateNprobe(nlist, content.indexOptimizedFor)
faissIndex.SetNProbe(nprobe)
// Best effort attempt to use GPU for training if configured
// May fall back to CPU training if GPU training fails due
// to any reason such as insufficient memory, or unavailability
// of GPUs, etc.
faissIndex, err = TrainIndex(faissIndex, vecs, dims, gpu)
if err != nil {
return 0, err
}

nprobe := calculateNprobe(nlist, content.indexOptimizedFor)
faissIndex.SetNProbe(nprobe)

err = faissIndex.Train(vecs)
err = faissIndex.SetDirectMap(2)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -621,6 +644,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do
}

//process field
name := field.Name()
vec := field.Vector()
dim := field.Dims()
metric := field.Similarity()
Expand Down Expand Up @@ -652,6 +676,7 @@ func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, do
dim: uint16(dim),
metric: metric,
indexOptimizedFor: indexOptimizedFor,
name: name,
}
} else {
vo.vecFieldMap[fieldID].vecs[subVecHash] = &vecInfo{
Expand Down Expand Up @@ -702,6 +727,7 @@ func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) reseta
}

type indexContent struct {
name string
vecs map[int64]*vecInfo
dim uint16
metric string
Expand Down