diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 799c16ae..4df6af62 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -26,7 +26,6 @@ import ( index "github.com/blevesearch/bleve_index_api" faiss "github.com/blevesearch/go-faiss" seg "github.com/blevesearch/scorch_segment_api/v2" - "golang.org/x/exp/maps" ) func init() { @@ -258,41 +257,6 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bit return fieldStart, nil } -// Func which returns if reconstruction is required. -// Won't be required if multiple flat indexes are being combined into a larger -// flat index. -func reconstructionRequired(isNewIndexIVF bool, indexes []*vecIndexMeta) bool { - // Always required for IVF indexes. - if isNewIndexIVF { - return true - } - - // if any existing index is not flat, all need to be reconstructed. - for _, index := range indexes { - _, isExistingIndexIVF := getIndexType(len(index.vecIds) + - len(index.deleted)) - if isExistingIndexIVF { - return true - } - } - - return false -} - -func removeDeletedVectors(index *faiss.IndexImpl, ids []int64) error { - sel, err := faiss.NewIDSelectorBatch(ids) - if err != nil { - return err - } - - defer sel.Delete() - _, err = index.RemoveIDs(sel) - if err != nil { - return err - } - return nil -} - // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase, @@ -321,105 +285,77 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme // index type to be created after merge. indexType, isIVF := getIndexType(len(vecToDocID)) - if reconstructionRequired(isIVF, indexes) { - // merging of indexes with reconstruction - // method. the indexes[i].vecIds is such that it has only the valid vecs - // of this vector index present in it, so we'd be reconstructed only the - // valid ones. - var indexData []float32 - for i := 0; i < len(vecIndexes); i++ { - if isClosed(closeCh) { - return seg.ErrClosed - } - - // reconstruct the vectors only if present, it could be that - // some of the indexes had all of their vectors updated/deleted. - if len(indexes[i].vecIds) > 0 { - // todo: parallelize reconstruction - recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), - indexes[i].vecIds) - if err != nil { - return err - } - indexData = append(indexData, recons...) - } - } - - if len(indexData) == 0 { - // no valid vectors for this index, so we don't even have to - // record it in the section - return nil - } - - // safe to assume that all the indexes are of the same config values, given - // that they are extracted from the field mapping info. - dims := vecIndexes[0].D() - metric := vecIndexes[0].MetricType() - finalVecIDs := maps.Keys(vecToDocID) + var finalVecIDs []int64 - index, err := faiss.IndexFactory(dims, indexType, metric) - if err != nil { - return err + // merging of indexes with reconstruction method. + // the indexes[i].vecIds has only the valid vecs of this vector + // index present in it, so we'd be reconstructing only those. + var indexData []float32 + for i := 0; i < len(vecIndexes); i++ { + if isClosed(closeCh) { + return seg.ErrClosed } - defer index.Close() - if isIVF { - // the direct map maintained in the IVF index is essential for the - // reconstruction of vectors based on vector IDs in the future merges. - // the AddWithIDs API also needs a direct map to be set before using. - err = index.SetDirectMap(2) - if err != nil { - return err - } - - // train the vector index, essentially performs k-means clustering to partition - // the data space of indexData such that during the search time, we probe - // only a subset of vectors -> non-exhaustive search. could be a time - // consuming step when the indexData is large. - err = index.Train(indexData) + // reconstruct the vectors only if present, it could be that + // some of the indexes had all of their vectors updated/deleted. + if len(indexes[i].vecIds) > 0 { + // todo: parallelize reconstruction + recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), + indexes[i].vecIds) if err != nil { return err } + indexData = append(indexData, recons...) + // Adding vector IDs in the same order as the vectors + finalVecIDs = append(finalVecIDs, indexes[i].vecIds...) } + } - err = index.AddWithIDs(indexData, finalVecIDs) - if err != nil { - return err - } + if len(indexData) == 0 { + // no valid vectors for this index, so we don't even have to + // record it in the section + return nil + } + + // safe to assume that all the indexes are of the same config values, given + // that they are extracted from the field mapping info. + dims := vecIndexes[0].D() + metric := vecIndexes[0].MetricType() - mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(index) + index, err := faiss.IndexFactory(dims, indexType, metric) + if err != nil { + return err + } + defer index.Close() + + if isIVF { + // the direct map maintained in the IVF index is essential for the + // reconstruction of vectors based on vector IDs in the future merges. + // the AddWithIDs API also needs a direct map to be set before using. + err = index.SetDirectMap(2) if err != nil { return err } - } else { - var err error - if len(indexes[0].deleted) > 0 { - err = removeDeletedVectors(vecIndexes[0], indexes[0].deleted) - if err != nil { - return err - } - } - for i := 1; i < len(vecIndexes); i++ { - if isClosed(closeCh) { - return seg.ErrClosed - } - if len(indexes[i].deleted) > 0 { - err = removeDeletedVectors(vecIndexes[i], indexes[i].deleted) - if err != nil { - return err - } - } - err = vecIndexes[0].MergeFrom(vecIndexes[i], 0) - if err != nil { - return err - } - } - mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(vecIndexes[0]) + // train the vector index, essentially performs k-means clustering to partition + // the data space of indexData such that during the search time, we probe + // only a subset of vectors -> non-exhaustive search. could be a time + // consuming step when the indexData is large. + err = index.Train(indexData) if err != nil { return err } } + + err = index.AddWithIDs(indexData, finalVecIDs) + if err != nil { + return err + } + + mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(index) + if err != nil { + return err + } fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w) if err != nil { return err