diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 79840a41f..685f1c921 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -60,6 +60,7 @@ var reflectStaticSizeIndexSnapshot int // exported variable, or at the index level by setting the FieldTFRCacheThreshold // in the kvConfig. var DefaultFieldTFRCacheThreshold uint64 = 10 +var DefaultSynonymTermReaderCacheThreshold uint64 = 10 func init() { var is interface{} = IndexSnapshot{} @@ -87,8 +88,9 @@ type IndexSnapshot struct { m sync.Mutex // Protects the fields that follow. refs int64 - m2 sync.Mutex // Protects the fields that follow. - fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's + m2 sync.Mutex // Protects the fields that follow. + fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's + synonymTermReaders map[string][]*IndexSnapshotSynonymTermReader // keyed by thesaurus name, recycled thesaurus readers } func (i *IndexSnapshot) Segments() []*SegmentSnapshot { @@ -649,6 +651,15 @@ func (is *IndexSnapshot) getFieldTFRCacheThreshold() uint64 { return DefaultFieldTFRCacheThreshold } +func (is *IndexSnapshot) getSynonymTermReaderCacheThreshold() uint64 { + if is.parent.config != nil { + if _, ok := is.parent.config["SynonymTermReaderCacheThreshold"]; ok { + return is.parent.config["SynonymTermReaderCacheThreshold"].(uint64) + } + } + return DefaultSynonymTermReaderCacheThreshold +} + func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) { if !tfr.recycle { // Do not recycle an optimized unadorned term field reader (used for @@ -677,6 +688,25 @@ func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReade is.m2.Unlock() } +func (is *IndexSnapshot) recycleSynonymTermReader(str *IndexSnapshotSynonymTermReader) { + is.parent.rootLock.RLock() + obsolete := is.parent.root != is + is.parent.rootLock.RUnlock() + if obsolete { + // if we're not the current root (mutations happened), don't bother recycling + return + } + + is.m2.Lock() + if is.synonymTermReaders == nil { + is.synonymTermReaders = map[string][]*IndexSnapshotSynonymTermReader{} + } + if uint64(len(is.synonymTermReaders[str.name])) < is.getSynonymTermReaderCacheThreshold() { + is.synonymTermReaders[str.name] = append(is.synonymTermReaders[str.name], str) + } + is.m2.Unlock() +} + func docNumberToBytes(buf []byte, in uint64) []byte { if len(buf) != 8 { if cap(buf) >= 8 { @@ -956,3 +986,60 @@ func (is *IndexSnapshot) CloseCopyReader() error { // close the index snapshot normally return is.Close() } + +func (is *IndexSnapshot) allocSynonymTermReader(name string) (str *IndexSnapshotSynonymTermReader) { + is.m2.Lock() + if is.synonymTermReaders != nil { + strs := is.synonymTermReaders[name] + last := len(strs) - 1 + if last >= 0 { + str = strs[last] + strs[last] = nil + is.synonymTermReaders[name] = strs[:last] + is.m2.Unlock() + return + } + } + is.m2.Unlock() + return &IndexSnapshotSynonymTermReader{} +} + +func (is *IndexSnapshot) SynonymTermReader(ctx context.Context, thesaurusName string, term []byte) (index.SynonymTermReader, error) { + rv := is.allocSynonymTermReader(thesaurusName) + + rv.name = thesaurusName + rv.snapshot = is + if rv.postings == nil { + rv.postings = make([]segment.SynonymsList, len(is.segment)) + } + if rv.iterators == nil { + rv.iterators = make([]segment.SynonymsIterator, len(is.segment)) + } + rv.segmentOffset = 0 + + if rv.thesauri == nil { + rv.thesauri = make([]segment.Thesaurus, len(is.segment)) + for i, s := range is.segment { + if synSeg, ok := s.segment.(segment.SynonymSegment); ok { + thes, err := synSeg.Thesaurus(thesaurusName) + if err != nil { + return nil, err + } + rv.thesauri[i] = thes + } + } + } + + for i, s := range is.segment { + if _, ok := s.segment.(segment.SynonymSegment); ok { + pl, err := rv.thesauri[i].SynonymsList(term, s.deleted, rv.postings[i]) + if err != nil { + return nil, err + } + rv.postings[i] = pl + + rv.iterators[i] = pl.Iterator(rv.iterators[i]) + } + } + return rv, nil +} diff --git a/index/scorch/snapshot_index_str.go b/index/scorch/snapshot_index_str.go new file mode 100644 index 000000000..5dee83770 --- /dev/null +++ b/index/scorch/snapshot_index_str.go @@ -0,0 +1,82 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scorch + +import ( + "reflect" + + "github.com/blevesearch/bleve/v2/size" + segment "github.com/blevesearch/scorch_segment_api/v2" +) + +var reflectStaticSizeIndexSnapshotSynonymTermReader int + +func init() { + var istr IndexSnapshotSynonymTermReader + reflectStaticSizeIndexSnapshotSynonymTermReader = int(reflect.TypeOf(istr).Size()) +} + +type IndexSnapshotSynonymTermReader struct { + name string + snapshot *IndexSnapshot + thesauri []segment.Thesaurus + postings []segment.SynonymsList + iterators []segment.SynonymsIterator + segmentOffset int +} + +func (i *IndexSnapshotSynonymTermReader) Size() int { + sizeInBytes := reflectStaticSizeIndexSnapshotSynonymTermReader + size.SizeOfPtr + + len(i.name) + + for _, thesaurus := range i.thesauri { + sizeInBytes += thesaurus.Size() + } + + for _, postings := range i.postings { + sizeInBytes += postings.Size() + } + + for _, iterator := range i.iterators { + sizeInBytes += iterator.Size() + } + + return sizeInBytes +} + +func (i *IndexSnapshotSynonymTermReader) Next() (string, error) { + // find the next hit + for i.segmentOffset < len(i.iterators) { + if i.iterators[i.segmentOffset] != nil { + next, err := i.iterators[i.segmentOffset].Next() + if err != nil { + return "", err + } + if next != nil { + synTerm := next.Term() + return synTerm, nil + } + i.segmentOffset++ + } + } + return "", nil +} + +func (i *IndexSnapshotSynonymTermReader) Close() error { + if i.snapshot != nil { + i.snapshot.recycleSynonymTermReader(i) + } + return nil +} diff --git a/mapping/document.go b/mapping/document.go index 847326e41..5d70af912 100644 --- a/mapping/document.go +++ b/mapping/document.go @@ -112,6 +112,17 @@ func (dm *DocumentMapping) analyzerNameForPath(path string) string { return "" } +// synonymSourceForPath attempts to first find the field +// described by this path, then returns the analyzer +// configured for that field +func (dm *DocumentMapping) synonymSourceForPath(path string) string { + field := dm.fieldDescribedByPath(path) + if field != nil { + return field.SynonymSource + } + return "" +} + func (dm *DocumentMapping) fieldDescribedByPath(path string) *FieldMapping { pathElements := decodePath(path) if len(pathElements) > 1 { diff --git a/mapping/field.go b/mapping/field.go index 5c064fddd..ad5b4f424 100644 --- a/mapping/field.go +++ b/mapping/field.go @@ -80,6 +80,8 @@ type FieldMapping struct { // Applicable to vector fields only - optimization string VectorIndexOptimizedFor string `json:"vector_index_optimized_for,omitempty"` + + SynonymSource string `json:"synonym_source,omitempty"` } // NewTextFieldMapping returns a default field mapping for text diff --git a/mapping/index.go b/mapping/index.go index fe8c96713..94b2cdfa7 100644 --- a/mapping/index.go +++ b/mapping/index.go @@ -54,6 +54,7 @@ type IndexMappingImpl struct { IndexDynamic bool `json:"index_dynamic"` DocValuesDynamic bool `json:"docvalues_dynamic"` CustomAnalysis *customAnalysis `json:"analysis,omitempty"` + SynonymSources map[string]*SynonymSource `json:"synonym_sources,omitempty"` cache *registry.Cache } @@ -186,6 +187,12 @@ func (im *IndexMappingImpl) Validate() error { return err } } + for _, synSource := range im.SynonymSources { + err = synSource.Validate(im.cache) + if err != nil { + return err + } + } return nil } @@ -283,6 +290,14 @@ func (im *IndexMappingImpl) UnmarshalJSON(data []byte) error { if err != nil { return err } + case "synonym_sources": + if im.SynonymSources == nil { + im.SynonymSources = make(map[string]*SynonymSource) + } + err := util.UnmarshalJSON(v, &im.SynonymSources) + if err != nil { + return err + } default: invalidKeys = append(invalidKeys, k) } @@ -457,3 +472,25 @@ func (im *IndexMappingImpl) FieldMappingForPath(path string) FieldMapping { func (im *IndexMappingImpl) DefaultSearchField() string { return im.DefaultField } + +func (im *IndexMappingImpl) SynonymSourceForPath(path string) string { + // first we look for explicit mapping on the field + for _, docMapping := range im.TypeMapping { + synonymSource := docMapping.synonymSourceForPath(path) + if synonymSource != "" { + return synonymSource + } + } + + // now try the default mapping + pathMapping, _ := im.DefaultMapping.documentMappingForPath(path) + if pathMapping != nil { + if len(pathMapping.Fields) > 0 { + if pathMapping.Fields[0].SynonymSource != "" { + return pathMapping.Fields[0].SynonymSource + } + } + } + + return "" +} diff --git a/mapping/synonym.go b/mapping/synonym.go new file mode 100644 index 000000000..5065b3b2a --- /dev/null +++ b/mapping/synonym.go @@ -0,0 +1,56 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mapping + +import ( + "fmt" + + "github.com/blevesearch/bleve/v2/registry" +) + +type SynonymSource struct { + CollectionName string `json:"collection"` + AnalyzerName string `json:"analyzer"` +} + +func (s *SynonymSource) Collection() string { + return s.CollectionName +} + +func (s *SynonymSource) Analyzer() string { + return s.AnalyzerName +} + +func (s *SynonymSource) SetCollection(c string) { + s.CollectionName = c +} + +func (s *SynonymSource) SetAnalyzer(a string) { + s.AnalyzerName = a +} + +func (s *SynonymSource) Validate(c *registry.Cache) error { + if s.CollectionName == "" { + return fmt.Errorf("collection name is required") + } + if s.AnalyzerName == "" { + return fmt.Errorf("analyzer name is required") + } + _, err := c.AnalyzerNamed(s.AnalyzerName) + if err != nil { + return fmt.Errorf("analyzer named '%s' not found", s.AnalyzerName) + } + return nil +} diff --git a/pre_search.go b/pre_search.go index c8c55bfbc..646d25199 100644 --- a/pre_search.go +++ b/pre_search.go @@ -26,6 +26,8 @@ type preSearchResultProcessor interface { finalize(*SearchResult) } +// ----------------------------------------------------------------------------- +// KNN preSearchResultProcessor for handling KNN presearch results type knnPreSearchResultProcessor struct { addFn func(sr *SearchResult, indexName string) finalizeFn func(sr *SearchResult) @@ -44,16 +46,77 @@ func (k *knnPreSearchResultProcessor) finalize(sr *SearchResult) { } // ----------------------------------------------------------------------------- +// Synonym preSearchResultProcessor for handling Synonym presearch results +type synonymPreSearchResultProcessor struct { + addFn func(sr *SearchResult, indexName string) + finalizeFn func(sr *SearchResult) +} -func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) { - if requestHasKNN(req) { - preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits) +func (s *synonymPreSearchResultProcessor) add(sr *SearchResult, indexName string) { + if s.addFn != nil { + s.addFn(sr, indexName) } } +func (s *synonymPreSearchResultProcessor) finalize(sr *SearchResult) { + if s.finalizeFn != nil { + s.finalizeFn(sr) + } +} + +// ----------------------------------------------------------------------------- +// Master struct that can hold any number of presearch result processors +type compositePreSearchResultProcessor struct { + presearchResultProcessors []preSearchResultProcessor +} + +// Implements the add method, which forwards to all the internal processors +func (m *compositePreSearchResultProcessor) add(sr *SearchResult, indexName string) { + for _, p := range m.presearchResultProcessors { + p.add(sr, indexName) + } +} + +// Implements the finalize method, which forwards to all the internal processors +func (m *compositePreSearchResultProcessor) finalize(sr *SearchResult) { + for _, p := range m.presearchResultProcessors { + p.finalize(sr) + } +} + +// ----------------------------------------------------------------------------- +// Function to create the appropriate preSearchResultProcessor(s) func createPreSearchResultProcessor(req *SearchRequest) preSearchResultProcessor { + var processors []preSearchResultProcessor + // Add KNN processor if the request has KNN + if requestHasKNN(req) { + if knnProcessor := newKnnPreSearchResultProcessor(req); knnProcessor != nil { + processors = append(processors, knnProcessor) + } + } + // Add Synonym processor if the request has Synonym + if requestHasSynonym(req) { + if synonymProcessor := newSynonymPreSearchResultProcessor(req); synonymProcessor != nil { + processors = append(processors, synonymProcessor) + } + } + // Return based on the number of processors, optimizing for the common case of 1 processor + // If there are no processors, return nil + switch len(processors) { + case 0: + return nil + case 1: + return processors[0] + default: + return &compositePreSearchResultProcessor{ + presearchResultProcessors: processors, + } + } +} + +// ----------------------------------------------------------------------------- +func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) { if requestHasKNN(req) { - return newKnnPreSearchResultProcessor(req) + preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits) } - return &knnPreSearchResultProcessor{} // equivalent to nil }