From f52bb04817f709020308435a26ddf2d07c42fe89 Mon Sep 17 00:00:00 2001 From: Aditi Ahuja Date: Thu, 4 Jan 2024 10:56:03 +0530 Subject: [PATCH 1/4] reconstruction for all index types --- section_faiss_vector_index.go | 159 ++++++++++++---------------------- 1 file changed, 54 insertions(+), 105 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 799c16ae..1093a7c5 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -258,27 +258,6 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bit return fieldStart, nil } -// Func which returns if reconstruction is required. -// Won't be required if multiple flat indexes are being combined into a larger -// flat index. -func reconstructionRequired(isNewIndexIVF bool, indexes []*vecIndexMeta) bool { - // Always required for IVF indexes. - if isNewIndexIVF { - return true - } - - // if any existing index is not flat, all need to be reconstructed. - for _, index := range indexes { - _, isExistingIndexIVF := getIndexType(len(index.vecIds) + - len(index.deleted)) - if isExistingIndexIVF { - return true - } - } - - return false -} - func removeDeletedVectors(index *faiss.IndexImpl, ids []int64) error { sel, err := faiss.NewIDSelectorBatch(ids) if err != nil { @@ -321,104 +300,74 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme // index type to be created after merge. indexType, isIVF := getIndexType(len(vecToDocID)) - if reconstructionRequired(isIVF, indexes) { - // merging of indexes with reconstruction - // method. the indexes[i].vecIds is such that it has only the valid vecs - // of this vector index present in it, so we'd be reconstructed only the - // valid ones. - var indexData []float32 - for i := 0; i < len(vecIndexes); i++ { - if isClosed(closeCh) { - return seg.ErrClosed - } - - // reconstruct the vectors only if present, it could be that - // some of the indexes had all of their vectors updated/deleted. - if len(indexes[i].vecIds) > 0 { - // todo: parallelize reconstruction - recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), - indexes[i].vecIds) - if err != nil { - return err - } - indexData = append(indexData, recons...) + // merging of indexes with reconstruction + // method. the indexes[i].vecIds is such that it has only the valid vecs + // of this vector index present in it, so we'd be reconstructed only the + // valid ones. + var indexData []float32 + for i := 0; i < len(vecIndexes); i++ { + if isClosed(closeCh) { + return seg.ErrClosed + } + + // reconstruct the vectors only if present, it could be that + // some of the indexes had all of their vectors updated/deleted. + if len(indexes[i].vecIds) > 0 { + // todo: parallelize reconstruction + recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), + indexes[i].vecIds) + if err != nil { + return err } + indexData = append(indexData, recons...) } + } - if len(indexData) == 0 { - // no valid vectors for this index, so we don't even have to - // record it in the section - return nil - } - - // safe to assume that all the indexes are of the same config values, given - // that they are extracted from the field mapping info. - dims := vecIndexes[0].D() - metric := vecIndexes[0].MetricType() - finalVecIDs := maps.Keys(vecToDocID) - - index, err := faiss.IndexFactory(dims, indexType, metric) - if err != nil { - return err - } - defer index.Close() + if len(indexData) == 0 { + // no valid vectors for this index, so we don't even have to + // record it in the section + return nil + } - if isIVF { - // the direct map maintained in the IVF index is essential for the - // reconstruction of vectors based on vector IDs in the future merges. - // the AddWithIDs API also needs a direct map to be set before using. - err = index.SetDirectMap(2) - if err != nil { - return err - } + // safe to assume that all the indexes are of the same config values, given + // that they are extracted from the field mapping info. + dims := vecIndexes[0].D() + metric := vecIndexes[0].MetricType() + finalVecIDs := maps.Keys(vecToDocID) - // train the vector index, essentially performs k-means clustering to partition - // the data space of indexData such that during the search time, we probe - // only a subset of vectors -> non-exhaustive search. could be a time - // consuming step when the indexData is large. - err = index.Train(indexData) - if err != nil { - return err - } - } + index, err := faiss.IndexFactory(dims, indexType, metric) + if err != nil { + return err + } + defer index.Close() - err = index.AddWithIDs(indexData, finalVecIDs) + if isIVF { + // the direct map maintained in the IVF index is essential for the + // reconstruction of vectors based on vector IDs in the future merges. + // the AddWithIDs API also needs a direct map to be set before using. + err = index.SetDirectMap(2) if err != nil { return err } - mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(index) + // train the vector index, essentially performs k-means clustering to partition + // the data space of indexData such that during the search time, we probe + // only a subset of vectors -> non-exhaustive search. could be a time + // consuming step when the indexData is large. + err = index.Train(indexData) if err != nil { return err } - } else { - var err error - if len(indexes[0].deleted) > 0 { - err = removeDeletedVectors(vecIndexes[0], indexes[0].deleted) - if err != nil { - return err - } - } - for i := 1; i < len(vecIndexes); i++ { - if isClosed(closeCh) { - return seg.ErrClosed - } - if len(indexes[i].deleted) > 0 { - err = removeDeletedVectors(vecIndexes[i], indexes[i].deleted) - if err != nil { - return err - } - } - err = vecIndexes[0].MergeFrom(vecIndexes[i], 0) - if err != nil { - return err - } - } + } - mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(vecIndexes[0]) - if err != nil { - return err - } + err = index.AddWithIDs(indexData, finalVecIDs) + if err != nil { + return err + } + + mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(index) + if err != nil { + return err } fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w) if err != nil { From 272d1d90ac6d8ba75dc7ceb69374795cb3ca4aa7 Mon Sep 17 00:00:00 2001 From: Aditi Ahuja Date: Thu, 4 Jan 2024 14:51:49 +0530 Subject: [PATCH 2/4] correct ordering of vector IDs --- section_faiss_vector_index.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 1093a7c5..9a22c856 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -26,7 +26,6 @@ import ( index "github.com/blevesearch/bleve_index_api" faiss "github.com/blevesearch/go-faiss" seg "github.com/blevesearch/scorch_segment_api/v2" - "golang.org/x/exp/maps" ) func init() { @@ -300,6 +299,8 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme // index type to be created after merge. indexType, isIVF := getIndexType(len(vecToDocID)) + var finalVecIDs []int64 + // merging of indexes with reconstruction // method. the indexes[i].vecIds is such that it has only the valid vecs // of this vector index present in it, so we'd be reconstructed only the @@ -320,6 +321,8 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme return err } indexData = append(indexData, recons...) + // Adding vector IDs in the same order as the vectors + finalVecIDs = append(finalVecIDs, indexes[i].vecIds...) } } @@ -333,7 +336,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme // that they are extracted from the field mapping info. dims := vecIndexes[0].D() metric := vecIndexes[0].MetricType() - finalVecIDs := maps.Keys(vecToDocID) index, err := faiss.IndexFactory(dims, indexType, metric) if err != nil { From 53bdc9cd24e7685308394272aaba1e9ee0934184 Mon Sep 17 00:00:00 2001 From: Aditi Ahuja Date: Thu, 4 Jan 2024 15:15:33 +0530 Subject: [PATCH 3/4] remove unused code --- section_faiss_vector_index.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 9a22c856..d21506ae 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -257,20 +257,6 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bit return fieldStart, nil } -func removeDeletedVectors(index *faiss.IndexImpl, ids []int64) error { - sel, err := faiss.NewIDSelectorBatch(ids) - if err != nil { - return err - } - - defer sel.Delete() - _, err = index.RemoveIDs(sel) - if err != nil { - return err - } - return nil -} - // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase, From 0194e61f1d4e64fb2fce1500f70535447ea2e8c1 Mon Sep 17 00:00:00 2001 From: Aditi Ahuja Date: Fri, 5 Jan 2024 11:32:15 +0530 Subject: [PATCH 4/4] fixed commentary --- section_faiss_vector_index.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index d21506ae..4df6af62 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -287,10 +287,9 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme var finalVecIDs []int64 - // merging of indexes with reconstruction - // method. the indexes[i].vecIds is such that it has only the valid vecs - // of this vector index present in it, so we'd be reconstructed only the - // valid ones. + // merging of indexes with reconstruction method. + // the indexes[i].vecIds has only the valid vecs of this vector + // index present in it, so we'd be reconstructing only those. var indexData []float32 for i := 0; i < len(vecIndexes); i++ { if isClosed(closeCh) {