From c951cf562ab0f911ee760c8be47c19aba98504b9 Mon Sep 17 00:00:00 2001 From: Alex Klibisz Date: Sun, 21 Jun 2020 13:12:19 -0400 Subject: [PATCH] Using MoreLikeThis under the hood for LSH queries (#91) --- .github/workflows/benchmark.yaml | 8 ++- .../elastiknn/benchmarks/Aggregate.scala | 9 +-- .../benchmarks/ContinuousBenchmark.scala | 34 ++++++++--- .../elastiknn/benchmarks/DatasetClient.scala | 32 +++++----- .../elastiknn/benchmarks/Execute.scala | 6 +- .../elastiknn/benchmarks/ResultClient.scala | 2 +- .../elastiknn/benchmarks/package.scala | 31 ++++++---- changelog.md | 5 ++ .../com/klibisz/elastiknn/api/package.scala | 29 ++++----- docs/pages/api.md | 60 +++++++++++++------ examples/demo/webapp/app/models/Dataset.scala | 16 ++--- .../elastiknn/query/KnnQueryBuilder.scala | 16 ++--- .../klibisz/elastiknn/query/LshQuery.scala | 29 ++++++--- .../api/ElasticsearchCodecSuite.scala | 9 +-- .../query/NearestNeighborsQuerySuite.scala | 10 ++-- version | 2 +- 16 files changed, 180 insertions(+), 118 deletions(-) diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml index 08b432b98..0b5db5057 100644 --- a/.github/workflows/benchmark.yaml +++ b/.github/workflows/benchmark.yaml @@ -6,9 +6,12 @@ name: Benchmark on: repository_dispatch: types: benchmark + schedule: + - cron: '0 0 * * *' push: branches: - master + - perf-* jobs: benchmark: name: Benchmark @@ -54,13 +57,14 @@ jobs: - name: Run Benchmark run: make benchmarks/continuous/run - name: Print Results - run: find . -name aggregate.csv -type f | xargs cat + run: | + python3 -m pip install csv2md + find . -name aggregate.csv -type f | xargs python3 -m csv2md - name: Report Results env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} RUN_ID: ${{ github.run_id }} run: | - python3 -m pip install csv2md COMMIT=$(git rev-parse --verify HEAD) \ RESULTS_TABLE=$(find . -name aggregate.csv -type f | xargs python3 -m csv2md) \ envsubst < .github/scripts/benchmarks-template.md > comment.md diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Aggregate.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Aggregate.scala index a8ce40560..192043c43 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Aggregate.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Aggregate.scala @@ -2,12 +2,9 @@ package com.klibisz.elastiknn.benchmarks import java.io.File -import com.klibisz.elastiknn.api.Mapping._ -import com.klibisz.elastiknn.api._ import kantan.csv._ import kantan.csv.ops._ import kantan.csv.generic._ -import org.apache.commons.math3.stat.descriptive.rank.Percentile import zio._ import zio.blocking.Blocking import zio.console.Console @@ -56,13 +53,13 @@ object Aggregate extends App { log.info(agg.toString).map(_ => agg) } - aggregates <- aggStream.run(ZSink.collectAll).map(_.sortBy(a => (a.dataset, a.similarity, a.algorithm))) + rows <- aggStream.run(ZSink.collectAll).map(_.sortBy(a => (a.dataset, a.similarity, a.algorithm))) // Write the rows to a temporary file csvFile = File.createTempFile("tmp", ".csv") writer = csvFile.asCsvWriter[AggregateResult](rfc.withHeader(AggregateResult.header: _*)) - _ = aggregates.foreach(writer.write) - _ <- log.info(s"Wrote ${aggregates.length} rows to csv file.") + _ = rows.foreach(writer.write) + _ <- log.info(s"Wrote ${rows.length} rows to csv file.") _ = writer.close() // Upload the file. diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ContinuousBenchmark.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ContinuousBenchmark.scala index 684bc0c45..8971a1494 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ContinuousBenchmark.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ContinuousBenchmark.scala @@ -9,27 +9,45 @@ import zio.console.Console */ object ContinuousBenchmark extends App { - private val randomDenseFloats = Dataset.RandomDenseFloat(500, 10000) - // private val randomSparseBools = Dataset.RandomSparseBool(500, 10000) + private val randomDenseFloats = Dataset.RandomDenseFloat(1000, 50000, 1000) + private val randomSparseBools = Dataset.RandomSparseBool(3000, 50000, 1000) private val field = "vec" private val bucket = s"elastiknn-benchmarks" + private val k = 100 private val experiments = Seq( - // L2 exact / LSH + // L2 exact, LSH Experiment( randomDenseFloats, Mapping.DenseFloat(randomDenseFloats.dims), NearestNeighborsQuery.Exact(field, Similarity.L2), - Mapping.L2Lsh(randomDenseFloats.dims, 300, 1, 3), - Seq(Query(NearestNeighborsQuery.L2Lsh(field, 1000), 100)) + Mapping.L2Lsh(randomDenseFloats.dims, 400, 1, 3), + Seq( + Query(NearestNeighborsQuery.L2Lsh(field, 1000), k), + Query(NearestNeighborsQuery.L2Lsh(field, 1300, useMLTQuery = true), k) + ) ), - // Angular exact / LSH + // Angular exact, LSH Experiment( randomDenseFloats, Mapping.DenseFloat(randomDenseFloats.dims), NearestNeighborsQuery.Exact(field, Similarity.Angular), - Mapping.AngularLsh(randomDenseFloats.dims, 300, 1), - Seq(Query(NearestNeighborsQuery.AngularLsh(field, 1000), 100)) + Mapping.AngularLsh(randomDenseFloats.dims, 400, 1), + Seq( + Query(NearestNeighborsQuery.AngularLsh(field, 1000), k), + Query(NearestNeighborsQuery.AngularLsh(field, 1300, useMLTQuery = true), k), + ) + ), + // Jaccard exact, sparse indexed, LSH + Experiment( + randomSparseBools, + Mapping.SparseBool(randomSparseBools.dims), + NearestNeighborsQuery.Exact(field, Similarity.Jaccard), + Mapping.JaccardLsh(randomSparseBools.dims, 400, 1), + Seq( + Query(NearestNeighborsQuery.JaccardLsh(field, 1000), k), + Query(NearestNeighborsQuery.JaccardLsh(field, 1300, useMLTQuery = true), k) + ) ) ) diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/DatasetClient.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/DatasetClient.scala index d0846ad3f..96e169799 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/DatasetClient.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/DatasetClient.scala @@ -16,35 +16,33 @@ import scala.util.hashing.MurmurHash3 object DatasetClient { trait Service { - def streamTrain[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, V] - def streamTest[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, V] + def streamTrain(dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, Vec] + def streamTest(dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, Vec] } - /** Implementation of [[DatasetClient.Service]] that reads from an s3 bucket. */ + /** + * Implementation of [[DatasetClient.Service]] that reads from an s3 bucket. + * Special case for random datasets. + * @return + */ def s3(bucket: String, keyPrefix: String): ZLayer[Has[AmazonS3], Throwable, DatasetClient] = ZLayer.fromService[AmazonS3, Service] { client => new Service { - private def stream[V <: Vec: ElasticsearchCodec](dataset: Dataset, name: String, limit: Option[Int]): Stream[Throwable, V] = + private def stream(dataset: Dataset, name: String, limit: Option[Int]): Stream[Throwable, Vec] = dataset match { case r: RandomSparseBool => implicit val rng: Random = new Random(MurmurHash3.orderedHash(Seq(r.dims, name))) Stream - .range(0, if (name == "train") r.count else r.count / 10) - .map(_ => Vec.SparseBool.random(r.dims)) - .map(ElasticsearchCodec.encode(_).hcursor) - .map(ElasticsearchCodec.decode[V](_)) - .mapM(ZIO.fromEither(_)) + .range(0, if (name == "train") r.train else r.test) + .map(_ => Vec.SparseBool.random(r.dims, r.bias)) case r: RandomDenseFloat => implicit val rng: Random = new Random(MurmurHash3.orderedHash(Seq(r.dims, name))) Stream - .range(0, if (name == "train") r.count else r.count / 10) + .range(0, if (name == "train") r.train else r.test) .map(_ => Vec.DenseFloat.random(r.dims)) - .map(ElasticsearchCodec.encode(_).hcursor) - .map(ElasticsearchCodec.decode[V](_)) - .mapM(ZIO.fromEither(_)) case _ => - def parseDecode(s: String): Either[circe.Error, V] = - ElasticsearchCodec.parse(s).flatMap(j => ElasticsearchCodec.decode[V](j.hcursor)) + def parseDecode(s: String): Either[circe.Error, Vec] = + ElasticsearchCodec.parse(s).flatMap(j => ElasticsearchCodec.decode[Vec](j.hcursor)) val obj = client.getObject(bucket, s"$keyPrefix/${dataset.name}/${name}.json.gz") val iterManaged = Managed.makeEffect(Source.fromInputStream(new GZIPInputStream(obj.getObjectContent)))(_.close()) val lines = Stream.fromIteratorManaged(iterManaged.map(src => limit.map(n => src.getLines.take(n)).getOrElse(src.getLines()))) @@ -52,10 +50,10 @@ object DatasetClient { rawJson.mapM(s => ZIO.fromEither(parseDecode(s))) } - override def streamTrain[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int]): Stream[Throwable, V] = + override def streamTrain(dataset: Dataset, limit: Option[Int]): Stream[Throwable, Vec] = stream(dataset, "train", limit) - override def streamTest[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int]): Stream[Throwable, V] = + override def streamTest(dataset: Dataset, limit: Option[Int]): Stream[Throwable, Vec] = stream(dataset, "test", limit) } } diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala index 9dab297fc..59070b2c6 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala @@ -72,7 +72,7 @@ object Execute extends App { parallelism: Int): ZIO[Logging with Clock with DatasetClient with ElastiknnZioClient, Throwable, BenchmarkResult] = { // Index name is a function of dataset, mapping and holdout so we can check if it already exists and avoid re-indexing. - val trainIndexName = s"ix-${dataset.name}-${MurmurHash3.orderedHash(Seq(dataset, eknnMapping))}" + val trainIndexName = s"ix-${dataset.name}-${MurmurHash3.orderedHash(Seq(dataset, eknnMapping))}".toLowerCase val testIndexName = s"$trainIndexName-test" // Create a primary and holdout index with same mappings. @@ -87,7 +87,7 @@ object Execute extends App { _ <- eknnClient.execute(createIndex(testIndexName).replicas(0).shards(parallelism).indexSetting("refresh_interval", "-1")) datasets <- ZIO.access[DatasetClient](_.get) _ <- log.info(s"Indexing vectors for dataset $dataset") - _ <- datasets.streamTrain[Vec](dataset).grouped(chunkSize).zipWithIndex.foreach { + _ <- datasets.streamTrain(dataset).grouped(chunkSize).zipWithIndex.foreach { case (vecs, batchIndex) => val ids = Some(vecs.indices.map(i => s"$batchIndex-$i")) for { @@ -95,7 +95,7 @@ object Execute extends App { _ <- log.debug(s"Indexed batch $batchIndex to $trainIndexName in ${dur.toMillis} ms") } yield () } - _ <- datasets.streamTest[Vec](dataset).grouped(chunkSize).zipWithIndex.foreach { + _ <- datasets.streamTest(dataset).grouped(chunkSize).zipWithIndex.foreach { case (vecs, batchIndex) => val ids = Some(vecs.indices.map(i => s"$batchIndex-$i")) for { diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ResultClient.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ResultClient.scala index bd64dd774..81942f17c 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ResultClient.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/ResultClient.scala @@ -67,7 +67,7 @@ object ResultClient { @tailrec def readAllKeys(req: ListObjectsV2Request, agg: Vector[String] = Vector.empty): Vector[String] = { val res = client.listObjectsV2(req) - val keys = res.getObjectSummaries.asScala.toVector.map(_.getKey) + val keys = res.getObjectSummaries.asScala.toVector.map(_.getKey).filter(_.endsWith(".json")) if (res.isTruncated) readAllKeys(req.withContinuationToken(res.getNextContinuationToken), agg ++ keys) else agg ++ keys } diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/package.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/package.scala index a71d0bc7e..e4e466318 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/package.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/package.scala @@ -20,7 +20,7 @@ package object benchmarks { type ElastiknnZioClient = Has[ElastiknnZioClient.Service] sealed abstract class Dataset(val dims: Int) { - final def name: String = this.getClass.getSimpleName.toLowerCase.replace("$", "") + def name: String = this.getClass.getSimpleName.replace("$", "") } object Dataset { case object AmazonHome extends Dataset(4096) @@ -36,8 +36,13 @@ package object benchmarks { case object AnnbMnist extends Dataset(784) case object AnnbNyt extends Dataset(256) case object AnnbSift extends Dataset(128) - case class RandomDenseFloat(override val dims: Int = 1024, count: Int = 10000) extends Dataset(dims) - case class RandomSparseBool(override val dims: Int = 4096, count: Int = 10000) extends Dataset(dims) + case class RandomDenseFloat(override val dims: Int = 1024, train: Int = 50000, test: Int = 1000) extends Dataset(dims) { + override def name: String = s"Random${dims}d${train / 1000}K${test / 1000}K" + } + case class RandomSparseBool(override val dims: Int = 4096, train: Int = 50000, test: Int = 1000, bias: Double = 0.25) + extends Dataset(dims) { + override def name: String = s"Random${dims}d${train / 1000}K${test / 1000}K" + } } final case class Query(nnq: NearestNeighborsQuery, k: Int) @@ -91,23 +96,25 @@ package object benchmarks { "query" ) - private def mappingToAlgorithmName(m: Mapping): String = m match { - case _: SparseBool => s"Exact" - case _: DenseFloat => "exact" - case _: SparseIndexed => "sparse indexed" - case _: JaccardLsh | _: HammingLsh | _: AngularLsh | _: L2Lsh => "lsh" + private def algorithmName(m: Mapping, q: NearestNeighborsQuery): String = m match { + case _: SparseBool => s"Exact" + case _: DenseFloat => "Exact" + case _: SparseIndexed => "Sparse indexed" + case _: JaccardLsh | _: HammingLsh | _: AngularLsh | _: L2Lsh => + q match { + case lsh: NearestNeighborsQuery.LshQuery if lsh.useMLTQuery => "LSH w/ MLT" + case _ => "LSH" + } } - private def queryToSimilarityName(q: NearestNeighborsQuery): String = q.similarity.toString - def apply(benchmarkResult: BenchmarkResult): AggregateResult = { val ptile = new Percentile() val recalls = benchmarkResult.queryResults.map(_.recall).toArray val durations = benchmarkResult.queryResults.map(_.duration.toDouble).toArray new AggregateResult( benchmarkResult.dataset.name, - queryToSimilarityName(benchmarkResult.query), - mappingToAlgorithmName(benchmarkResult.mapping), + benchmarkResult.query.similarity.toString, + algorithmName(benchmarkResult.mapping, benchmarkResult.query), benchmarkResult.k, ptile.evaluate(recalls, 0.1).toFloat, ptile.evaluate(durations, 0.1).toFloat, diff --git a/changelog.md b/changelog.md index 93ec3f4fb..67c549bb8 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,8 @@ +- Added an option for LSH queries to use the more-like-this heuristics to pick a subset of LSH hashes to retrieve candidate vectors. + Uses Lucene's [MoreLikeThis class](https://lucene.apache.org/core/8_5_0/queries/org/apache/lucene/queries/mlt/MoreLikeThis.html) + to pick a subset of hashes based on index statistics. It's generally much faster than using _all_ of the hashes, + yields comparable recall, but is still disabled by default. +--- - Using ConstantScoreQuery to wrap the TermQuery's used for matching hashes in Elastiknn's SparseIndexQuery and LshQuery. Improves the SparseIndexedQuery benchmark from ~66 seconds to ~48 seconds. Improves the LshQuery benchmark from ~37 seconds to ~31 seconds. diff --git a/core/src/main/scala/com/klibisz/elastiknn/api/package.scala b/core/src/main/scala/com/klibisz/elastiknn/api/package.scala index b0f69c831..1bba5b73a 100644 --- a/core/src/main/scala/com/klibisz/elastiknn/api/package.scala +++ b/core/src/main/scala/com/klibisz/elastiknn/api/package.scala @@ -1,5 +1,7 @@ package com.klibisz.elastiknn +import jdk.jfr.Experimental + import scala.util.Random package object api { @@ -107,37 +109,32 @@ package object api { def apply(field: String, similarity: Similarity): SparseIndexed = SparseIndexed(field, Vec.Empty(), similarity) } - final case class JaccardLsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery { + sealed trait LshQuery extends NearestNeighborsQuery { + def candidates: Int + + @Experimental + def useMLTQuery: Boolean + } + + final case class JaccardLsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery { override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v) override def similarity: Similarity = Similarity.Jaccard } - object JaccardLsh { - def apply(field: String, candidates: Int): JaccardLsh = JaccardLsh(field, Vec.Empty(), candidates) - } - final case class HammingLsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery { + final case class HammingLsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery { override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v) override def similarity: Similarity = Similarity.Hamming } - object HammingLsh { - def apply(field: String, candidates: Int): HammingLsh = HammingLsh(field, Vec.Empty(), candidates) - } - final case class AngularLsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery { + final case class AngularLsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery { override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v) override def similarity: Similarity = Similarity.Angular } - object AngularLsh { - def apply(field: String, candidates: Int): AngularLsh = AngularLsh(field, Vec.Empty(), candidates) - } - final case class L2Lsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery { + final case class L2Lsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery { override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v) override def similarity: Similarity = Similarity.L2 } - object L2Lsh { - def apply(field: String, candidates: Int): L2Lsh = L2Lsh(field, Vec.Empty(), candidates) - } } } diff --git a/docs/pages/api.md b/docs/pages/api.md index 7decb531e..399aef57d 100644 --- a/docs/pages/api.md +++ b/docs/pages/api.md @@ -387,7 +387,7 @@ Elasticsearch queries must return a non-negative floating-point score. For Elast |L1|`1 / (l1 distance + 1e-6)`|0|1e6| |L2|`1 / (l2 distance + 1e-6)`|0|1e6| -If you're using the `elastiknn_nearest_neighbors` query with other queries and the score values are inconvenient (e.g. huge values like 1e6), consider wrapping the query in a [Script Score Query](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-script-score-query.html), where you can access and transform the `_score` value. +If you're using the `elastiknn_nearest_neighbors` query with other queries, and the score values are inconvenient (e.g. huge values like 1e6), consider wrapping the query in a [Script Score Query](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-script-score-query.html), where you can access and transform the `_score` value. ### Query Vector @@ -480,15 +480,29 @@ GET /my-index/_search ### LSH Search Strategy -All of the LSH search models follow roughly the same strategy. They first retrieve approximate neighbors based on common hash terms and then compute the slower exact similarity for a small subset of the best approximate candidates. The exact steps are as follows: +All LSH search models follow roughly the same strategy. +They first retrieve approximate neighbors based on common hash terms and then compute the exact similarity for a subset of the best approximate candidates. +The exact steps are as follows: 1. Hash the query vector using model parameters that were specified in the indexed vector's mapping. -2. Convert the hash values to Lucene Terms in a Lucene Boolean Query which uses an inverted index to compute the size of the intersection of hash terms between the query vector and indexed vectors. -3. Vectors with non-empty intersections are passed to a scoring function which maintains a min-heap of size `candidates`. Notice `candidates` is a parameter for all LSH queries. The heap maintains the highest-scoring approximate neighbors. If an indexed vector exceeds the lowest approximate score in the heap, we compute its exact similarity and replace it in the heap. Otherwise it gets a score of 0. - -If you set `candidates` to 0, the query skips all heap-related logic and exact similarity computations. The score for each vector is the number of intersecting hash terms. - -It seems reasonable to set `candidates` to 2x to 10x larger than the number of hits you want to return (Elasticsearch defaults to 10 hits). For example, if you have 100,000 vectors in your index, you want the 10 nearest neighbors, and you set `candidates` to 100, then your query will compute the exact similarity roughly 100 times per shard. +2. Use the hash values to create an approximate query using one of two strategies: + - A [boolean query](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html) with a _should_ clauses for every hash. + This is currently the default. + - A boolean query with _should match_ clauses for a subset of terms, with terms picked using heuristics from the + [More Like This query](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html). + This is generally faster, but is still experimental. To use this strategy, set `useMLTQuery` to `true` in the query JSON. +3. Execute the approximate query to get vectors with matching hashes. +3. Pass Vectors with matching hashes to a scoring function which maintains a min-heap of size `candidates`. +The heap maintains the highest-scoring approximate neighbors. +If an indexed vector exceeds the lowest approximate score in the heap, compute its exact similarity and replace it in the heap. +Otherwise, it gets a score of 0. + +If you set `candidates` to 0, the query skips all heap-related logic and exact similarity computations. +The score for each vector is the number of intersecting hash terms. + +It seems reasonable to set `candidates` to 2x to 10x larger than the number of hits you want to return (Elasticsearch defaults to 10 hits). +For example, if you have 100,000 vectors in your index, you want the 10 nearest neighbors, and you set `candidates` to 100, +then your query will compute the exact similarity roughly 100 times per shard. ### Jaccard LSH Query @@ -507,6 +521,7 @@ GET /my-index/_search "model": "lsh", # 3 "similarity": "jaccard", # 4 "candidates": 50, # 5 + "useMLTQuery": false # 6 } } } @@ -519,6 +534,7 @@ GET /my-index/_search |3|Model name.| |4|Similarity function.| |5|Number of candidates. See the section on LSH Search Strategy.| +|6|Set to true to use the more-like-this heuristic to pick a subset of hashes. Generally faster but still experimental.| ### Hamming LSH Query @@ -537,6 +553,7 @@ GET /my-index/_search "model": "lsh", # 3 "similarity": "hamming", # 4 "candidates": 50, # 5 + "useMLTQuery": false # 6 } } } @@ -549,6 +566,7 @@ GET /my-index/_search |3|Model name.| |4|Similarity function.| |5|Number of candidates. See the section on LSH Search Strategy.| +|6|Set to true to use the more-like-this heuristic to pick a subset of hashes. Generally faster but still experimental.| ### Angular LSH Query @@ -559,13 +577,14 @@ GET /my-index/_search { "query": { "elastiknn_nearest_neighbors": { - "field": "my_vec", # 1 - "vec": { # 2 + "field": "my_vec", # 1 + "vec": { # 2 "values": [0.1, 0.2, 0.3, ...] }, - "model": "lsh", # 3 - "similarity": "angular", # 4 - "candidates": 50, # 5 + "model": "lsh", # 3 + "similarity": "angular", # 4 + "candidates": 50, # 5 + "useMLTQuery": false # 6 } } } @@ -578,10 +597,11 @@ GET /my-index/_search |3|Model name.| |4|Similarity function.| |5|Number of candidates. See the section on LSH Search Strategy.| +|6|Set to true to use the more-like-this heuristic to pick a subset of hashes. Generally faster but still experimental.| ### L1 LSH Query -Work in progress. +Not yet implemented. ### L2 LSH Query @@ -592,13 +612,14 @@ GET /my-index/_search { "query": { "elastiknn_nearest_neighbors": { - "field": "my_vec", # 1 - "vec": { # 2 + "field": "my_vec", # 1 + "vec": { # 2 "values": [0.1, 0.2, 0.3, ...] }, - "model": "lsh", # 3 - "similarity": "l2", # 4 - "candidates": 50 # 5 + "model": "lsh", # 3 + "similarity": "l2", # 4 + "candidates": 50 # 5 + "useMLTQuery": false # 6 } } } @@ -611,6 +632,7 @@ GET /my-index/_search |3|Model name.| |4|Similarity function.| |5|Number of candidates. See the section on LSH Search Strategy.| +|6|Set to true to use the more-like-this heuristic to pick a subset of hashes. Generally faster but still experimental.| ## Mapping and Query Compatibility diff --git a/examples/demo/webapp/app/models/Dataset.scala b/examples/demo/webapp/app/models/Dataset.scala index 0041faf75..100cd9b90 100644 --- a/examples/demo/webapp/app/models/Dataset.scala +++ b/examples/demo/webapp/app/models/Dataset.scala @@ -50,11 +50,11 @@ object Dataset extends ElastiknnRequests { example("Jaccard LSH #1", "mnist-jaccard-lsh-1", Mapping.JaccardLsh(784, 100, 1), - (f, v) => NearestNeighborsQuery.JaccardLsh(f, v, 100)), + (f, v) => NearestNeighborsQuery.JaccardLsh(f, 100, v)), example("Jaccard LSH #2", "mnist-jaccard-lsh-2", Mapping.JaccardLsh(784, 100, 1), - (f, v) => NearestNeighborsQuery.JaccardLsh(f, v, 20)), + (f, v) => NearestNeighborsQuery.JaccardLsh(f, 20, v)), ) ), Dataset( @@ -74,11 +74,11 @@ object Dataset extends ElastiknnRequests { example("Hamming LSH #1", "mnist-hamming-lsh-1", Mapping.HammingLsh(784, 100), - (f, v) => NearestNeighborsQuery.HammingLsh(f, v, 100)), + (f, v) => NearestNeighborsQuery.HammingLsh(f, 100, v)), example("Hamming LSH #2", "mnist-hamming-lsh-2", Mapping.HammingLsh(784, 100), - (f, v) => NearestNeighborsQuery.HammingLsh(f, v, 20)), + (f, v) => NearestNeighborsQuery.HammingLsh(f, 20, v)), ) ), Dataset( @@ -96,11 +96,11 @@ object Dataset extends ElastiknnRequests { example("Angular LSH 1", "word2vec-google-angular-lsh-1", Mapping.AngularLsh(300, 100, 1), - (f, v) => NearestNeighborsQuery.AngularLsh(f, v, 100)), + (f, v) => NearestNeighborsQuery.AngularLsh(f, 100, v)), example("Angular LSH 2", "word2vec-google-angular-lsh-2", Mapping.AngularLsh(300, 100, 1), - (f, v) => NearestNeighborsQuery.AngularLsh(f, v, 20)), + (f, v) => NearestNeighborsQuery.AngularLsh(f, 20, v)), ) ), Dataset( @@ -110,8 +110,8 @@ object Dataset extends ElastiknnRequests { "https://keras.io/datasets/", Seq( example("Exact", "cifar-l2-exact", Mapping.DenseFloat(3072), (f, v) => NearestNeighborsQuery.Exact(f, v, Similarity.L2)), - example("L2 LSH #1", "cifar-l2-lsh-1", Mapping.L2Lsh(3072, 100, 1, 3), (f, v) => NearestNeighborsQuery.L2Lsh(f, v, 100)), - example("L2 LSH #2", "cifar-l2-lsh-2", Mapping.L2Lsh(3072, 100, 1, 3), (f, v) => NearestNeighborsQuery.L2Lsh(f, v, 20)), + example("L2 LSH #1", "cifar-l2-lsh-1", Mapping.L2Lsh(3072, 100, 1, 3), (f, v) => NearestNeighborsQuery.L2Lsh(f, 100, v)), + example("L2 LSH #2", "cifar-l2-lsh-2", Mapping.L2Lsh(3072, 100, 1, 3), (f, v) => NearestNeighborsQuery.L2Lsh(f, 20, v)), ) ) ) diff --git a/plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala index 807ed7d34..a4ed8c198 100644 --- a/plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala @@ -91,17 +91,17 @@ final case class KnnQueryBuilder(query: NearestNeighborsQuery) extends AbstractQ case (SparseIndexed(f, sbv: Vec.SparseBool, Similarity.Hamming), _: Mapping.SparseIndexed) => SparseIndexedQuery(f, sbv, SparseIndexedSimilarityFunction.Hamming) - case (JaccardLsh(f, v: Vec.SparseBool, candidates), m: Mapping.JaccardLsh) => - LshQuery(f, m, v, candidates, LshFunctionCache.Jaccard) + case (JaccardLsh(f, candidates, v: Vec.SparseBool, useMLTQuery), m: Mapping.JaccardLsh) => + LshQuery(f, m, v, candidates, LshFunctionCache.Jaccard, c.getIndexReader, useMLTQuery) - case (HammingLsh(f, v: Vec.SparseBool, candidates), m: Mapping.HammingLsh) => - LshQuery(f, m, v, candidates, LshFunctionCache.Hamming) + case (HammingLsh(f, candidates, v: Vec.SparseBool, useMLTQuery), m: Mapping.HammingLsh) => + LshQuery(f, m, v, candidates, LshFunctionCache.Hamming, c.getIndexReader, useMLTQuery) - case (AngularLsh(f, v: Vec.DenseFloat, candidates), m: Mapping.AngularLsh) => - LshQuery(f, m, v, candidates, LshFunctionCache.Angular) + case (AngularLsh(f, candidates, v: Vec.DenseFloat, useMLTQuery), m: Mapping.AngularLsh) => + LshQuery(f, m, v, candidates, LshFunctionCache.Angular, c.getIndexReader, useMLTQuery) - case (L2Lsh(f, v: Vec.DenseFloat, candidates), m: Mapping.L2Lsh) => - LshQuery(f, m, v, candidates, LshFunctionCache.L2) + case (L2Lsh(f, candidates, v: Vec.DenseFloat, useMLTQuery), m: Mapping.L2Lsh) => + LshQuery(f, m, v, candidates, LshFunctionCache.L2, c.getIndexReader, useMLTQuery) case _ => throw incompatible(mapping, query) } diff --git a/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala index 70c4f3e9c..c2bd88981 100644 --- a/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala @@ -1,5 +1,6 @@ package com.klibisz.elastiknn.query +import java.io.{ByteArrayInputStream, InputStreamReader} import java.lang import java.util.Objects @@ -8,8 +9,10 @@ import com.klibisz.elastiknn.api.{Mapping, Vec} import com.klibisz.elastiknn.mapper.VectorMapper import com.klibisz.elastiknn.models.LshFunction import com.klibisz.elastiknn.storage.{StoredVec, UnsafeSerialization} +import org.apache.lucene.analysis.core.KeywordAnalyzer import org.apache.lucene.document.Field import org.apache.lucene.index._ +import org.apache.lucene.queries.mlt.MoreLikeThis import org.apache.lucene.search._ import org.apache.lucene.util.BytesRef import org.elasticsearch.common.lucene.search.function.{CombineFunction, FunctionScoreQuery, LeafScoreFunction, ScoreFunction} @@ -63,16 +66,26 @@ object LshQuery { override def doHashCode(): Int = Objects.hash(field, query, lshFunc, lshFunc, candidates.asInstanceOf[AnyRef]) } - def apply[M <: Mapping, V <: Vec, S <: StoredVec]( - field: String, - mapping: M, - queryVec: V, - candidates: Int, - lshFunctionCache: LshFunctionCache[M, V, S])(implicit codec: StoredVec.Codec[V, S]): Query = { + def apply[M <: Mapping, V <: Vec, S <: StoredVec](field: String, + mapping: M, + queryVec: V, + candidates: Int, + lshFunctionCache: LshFunctionCache[M, V, S], + indexReader: IndexReader, + useMLTQuery: Boolean)(implicit codec: StoredVec.Codec[V, S]): Query = { val lshFunc: LshFunction[M, V, S] = lshFunctionCache(mapping) - val isecQuery: BooleanQuery = { + val hashes: Array[Int] = lshFunc(queryVec) + val isecQuery: Query = if (useMLTQuery) { + val mlt = new MoreLikeThis(indexReader) + mlt.setFieldNames(Array(field)) + mlt.setMinTermFreq(1) + mlt.setMaxQueryTerms(hashes.length) + mlt.setAnalyzer(new KeywordAnalyzer()) + val readers = hashes.map(h => new InputStreamReader(new ByteArrayInputStream(UnsafeSerialization.writeInt(h)))) + mlt.like(field, readers: _*) + } else { val builder = new BooleanQuery.Builder - lshFunc(queryVec).foreach { h => + hashes.foreach { h => val term = new Term(field, new BytesRef(UnsafeSerialization.writeInt(h))) val termQuery = new TermQuery(term) val constQuery = new ConstantScoreQuery(termQuery) diff --git a/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala index c9a4b6bc6..46a500eff 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala @@ -11,8 +11,8 @@ class ElasticsearchCodecSuite extends FunSuite with Matchers { implicit class CodecMatcher(s: String) { def shouldDecodeTo[T: ElasticsearchCodec](obj: T): Assertion = { - lazy val parsed: Either[circe.Error, Json] = ElasticsearchCodec.parse(s) - lazy val decoded: Either[circe.Error, T] = parsed.flatMap(ElasticsearchCodec.decodeJson[T]) + val parsed: Either[circe.Error, Json] = ElasticsearchCodec.parse(s) + val decoded: Either[circe.Error, T] = parsed.flatMap(ElasticsearchCodec.decodeJson[T]) withClue("can't parse the given json string") { parsed shouldBe 'right @@ -133,7 +133,7 @@ class ElasticsearchCodecSuite extends FunSuite with Matchers { |""".stripMargin.shouldNotDecodeTo[Mapping] } - test("nearest neighbor queries (revised)") { + test("nearest neighbor queries") { import NearestNeighborsQuery._ @@ -179,11 +179,12 @@ class ElasticsearchCodecSuite extends FunSuite with Matchers { | "model": "lsh", | "similarity": "jaccard", | "candidates": 100, + | "useMLTQuery": true, | "vec": { | "true_indices": [1,2,3], | "total_indices": 99 | } |} - |""".stripMargin shouldDecodeTo [NearestNeighborsQuery] JaccardLsh("vec", Vec.SparseBool(Array(1, 2, 3), 99), 100) + |""".stripMargin shouldDecodeTo [NearestNeighborsQuery] JaccardLsh("vec", 100, Vec.SparseBool(Array(1, 2, 3), 99), true) } } diff --git a/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQuerySuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQuerySuite.scala index 849faae0b..ff967232f 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQuerySuite.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQuerySuite.scala @@ -59,30 +59,30 @@ class NearestNeighborsQuerySuite extends AsyncFunSuite with Matchers with Inspec // Jaccard Lsh Test( d => Seq(Mapping.JaccardLsh(d, 20, 1)), - (f, v) => NearestNeighborsQuery.JaccardLsh(f, v, testDataNumQueries * 2), + (f, v) => NearestNeighborsQuery.JaccardLsh(f, testDataNumQueries * 2, v), 0.8 ), Test( d => Seq(Mapping.JaccardLsh(d, 40, 2)), - (f, v) => NearestNeighborsQuery.JaccardLsh(f, v, testDataNumQueries * 2), + (f, v) => NearestNeighborsQuery.JaccardLsh(f, testDataNumQueries * 2, v), 0.67 ), // Hamming Lsh Test( d => Seq(Mapping.HammingLsh(d, d / 2)), - (f, v) => NearestNeighborsQuery.HammingLsh(f, v, testDataNumQueries * 2), + (f, v) => NearestNeighborsQuery.HammingLsh(f, testDataNumQueries * 2, v), 0.9 ), // Angular Lsh Test( d => Seq(Mapping.AngularLsh(d, d / 2, 1)), - (f, v) => NearestNeighborsQuery.AngularLsh(f, v, testDataNumQueries * 3 / 2), + (f, v) => NearestNeighborsQuery.AngularLsh(f, testDataNumQueries * 3 / 2, v), 0.67 ), // L2 Lsh Test( d => Seq(Mapping.L2Lsh(d, d * 2 / 3, 1, 3)), - (f, v) => NearestNeighborsQuery.L2Lsh(f, v, testDataNumQueries * 3 / 2), + (f, v) => NearestNeighborsQuery.L2Lsh(f, testDataNumQueries * 3 / 2, v), 0.67 ) ) diff --git a/version b/version index 7b6ac039e..5e4ce8912 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.1.0-PRE19 +0.1.0-PRE20