Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-60269 - Merge path fixes #202

Merged
merged 4 commits into from
Jan 5, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 55 additions & 118 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
index "github.com/blevesearch/bleve_index_api"
faiss "github.com/blevesearch/go-faiss"
seg "github.com/blevesearch/scorch_segment_api/v2"
"golang.org/x/exp/maps"
)

func init() {
Expand Down Expand Up @@ -258,41 +257,6 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bit
return fieldStart, nil
}

// Func which returns if reconstruction is required.
// Won't be required if multiple flat indexes are being combined into a larger
// flat index.
func reconstructionRequired(isNewIndexIVF bool, indexes []*vecIndexMeta) bool {
// Always required for IVF indexes.
if isNewIndexIVF {
return true
}

// if any existing index is not flat, all need to be reconstructed.
for _, index := range indexes {
_, isExistingIndexIVF := getIndexType(len(index.vecIds) +
len(index.deleted))
if isExistingIndexIVF {
return true
}
}

return false
}

func removeDeletedVectors(index *faiss.IndexImpl, ids []int64) error {
sel, err := faiss.NewIDSelectorBatch(ids)
if err != nil {
return err
}

defer sel.Delete()
_, err = index.RemoveIDs(sel)
if err != nil {
return err
}
return nil
}

// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase,
Expand Down Expand Up @@ -321,105 +285,78 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme
// index type to be created after merge.
indexType, isIVF := getIndexType(len(vecToDocID))

if reconstructionRequired(isIVF, indexes) {
// merging of indexes with reconstruction
// method. the indexes[i].vecIds is such that it has only the valid vecs
// of this vector index present in it, so we'd be reconstructed only the
// valid ones.
var indexData []float32
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return seg.ErrClosed
}

// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
// todo: parallelize reconstruction
recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)),
indexes[i].vecIds)
if err != nil {
return err
}
indexData = append(indexData, recons...)
}
}

if len(indexData) == 0 {
// no valid vectors for this index, so we don't even have to
// record it in the section
return nil
}

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
finalVecIDs := maps.Keys(vecToDocID)
var finalVecIDs []int64

index, err := faiss.IndexFactory(dims, indexType, metric)
if err != nil {
return err
// merging of indexes with reconstruction
// method. the indexes[i].vecIds is such that it has only the valid vecs
// of this vector index present in it, so we'd be reconstructed only the
// valid ones.
var indexData []float32
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return seg.ErrClosed
}
defer index.Close()

if isIVF {
// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = index.SetDirectMap(2)
if err != nil {
return err
}

// train the vector index, essentially performs k-means clustering to partition
// the data space of indexData such that during the search time, we probe
// only a subset of vectors -> non-exhaustive search. could be a time
// consuming step when the indexData is large.
err = index.Train(indexData)
// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
// todo: parallelize reconstruction
recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)),
indexes[i].vecIds)
if err != nil {
return err
}
indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
}
}

err = index.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}
if len(indexData) == 0 {
// no valid vectors for this index, so we don't even have to
// record it in the section
return nil
}

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()

mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(index)
index, err := faiss.IndexFactory(dims, indexType, metric)
if err != nil {
return err
}
defer index.Close()

if isIVF {
// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = index.SetDirectMap(2)
if err != nil {
return err
}
} else {
var err error
if len(indexes[0].deleted) > 0 {
err = removeDeletedVectors(vecIndexes[0], indexes[0].deleted)
if err != nil {
return err
}
}
for i := 1; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return seg.ErrClosed
}
if len(indexes[i].deleted) > 0 {
err = removeDeletedVectors(vecIndexes[i], indexes[i].deleted)
if err != nil {
return err
}
}
err = vecIndexes[0].MergeFrom(vecIndexes[i], 0)
if err != nil {
return err
}
}

mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(vecIndexes[0])
// train the vector index, essentially performs k-means clustering to partition
// the data space of indexData such that during the search time, we probe
// only a subset of vectors -> non-exhaustive search. could be a time
// consuming step when the indexData is large.
err = index.Train(indexData)
if err != nil {
return err
}
}

err = index.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}

mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(index)
if err != nil {
return err
}
fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w)
if err != nil {
return err
Expand Down
Loading