Skip to content

Commit

Permalink
Using MoreLikeThis under the hood for LSH queries (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexklibisz authored Jun 21, 2020
1 parent 25aa52a commit c951cf5
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 118 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ name: Benchmark
on:
repository_dispatch:
types: benchmark
schedule:
- cron: '0 0 * * *'
push:
branches:
- master
- perf-*
jobs:
benchmark:
name: Benchmark
Expand Down Expand Up @@ -54,13 +57,14 @@ jobs:
- name: Run Benchmark
run: make benchmarks/continuous/run
- name: Print Results
run: find . -name aggregate.csv -type f | xargs cat
run: |
python3 -m pip install csv2md
find . -name aggregate.csv -type f | xargs python3 -m csv2md
- name: Report Results
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
RUN_ID: ${{ github.run_id }}
run: |
python3 -m pip install csv2md
COMMIT=$(git rev-parse --verify HEAD) \
RESULTS_TABLE=$(find . -name aggregate.csv -type f | xargs python3 -m csv2md) \
envsubst < .github/scripts/benchmarks-template.md > comment.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package com.klibisz.elastiknn.benchmarks

import java.io.File

import com.klibisz.elastiknn.api.Mapping._
import com.klibisz.elastiknn.api._
import kantan.csv._
import kantan.csv.ops._
import kantan.csv.generic._
import org.apache.commons.math3.stat.descriptive.rank.Percentile
import zio._
import zio.blocking.Blocking
import zio.console.Console
Expand Down Expand Up @@ -56,13 +53,13 @@ object Aggregate extends App {
log.info(agg.toString).map(_ => agg)
}

aggregates <- aggStream.run(ZSink.collectAll).map(_.sortBy(a => (a.dataset, a.similarity, a.algorithm)))
rows <- aggStream.run(ZSink.collectAll).map(_.sortBy(a => (a.dataset, a.similarity, a.algorithm)))

// Write the rows to a temporary file
csvFile = File.createTempFile("tmp", ".csv")
writer = csvFile.asCsvWriter[AggregateResult](rfc.withHeader(AggregateResult.header: _*))
_ = aggregates.foreach(writer.write)
_ <- log.info(s"Wrote ${aggregates.length} rows to csv file.")
_ = rows.foreach(writer.write)
_ <- log.info(s"Wrote ${rows.length} rows to csv file.")
_ = writer.close()

// Upload the file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,45 @@ import zio.console.Console
*/
object ContinuousBenchmark extends App {

private val randomDenseFloats = Dataset.RandomDenseFloat(500, 10000)
// private val randomSparseBools = Dataset.RandomSparseBool(500, 10000)
private val randomDenseFloats = Dataset.RandomDenseFloat(1000, 50000, 1000)
private val randomSparseBools = Dataset.RandomSparseBool(3000, 50000, 1000)
private val field = "vec"
private val bucket = s"elastiknn-benchmarks"
private val k = 100

private val experiments = Seq(
// L2 exact / LSH
// L2 exact, LSH
Experiment(
randomDenseFloats,
Mapping.DenseFloat(randomDenseFloats.dims),
NearestNeighborsQuery.Exact(field, Similarity.L2),
Mapping.L2Lsh(randomDenseFloats.dims, 300, 1, 3),
Seq(Query(NearestNeighborsQuery.L2Lsh(field, 1000), 100))
Mapping.L2Lsh(randomDenseFloats.dims, 400, 1, 3),
Seq(
Query(NearestNeighborsQuery.L2Lsh(field, 1000), k),
Query(NearestNeighborsQuery.L2Lsh(field, 1300, useMLTQuery = true), k)
)
),
// Angular exact / LSH
// Angular exact, LSH
Experiment(
randomDenseFloats,
Mapping.DenseFloat(randomDenseFloats.dims),
NearestNeighborsQuery.Exact(field, Similarity.Angular),
Mapping.AngularLsh(randomDenseFloats.dims, 300, 1),
Seq(Query(NearestNeighborsQuery.AngularLsh(field, 1000), 100))
Mapping.AngularLsh(randomDenseFloats.dims, 400, 1),
Seq(
Query(NearestNeighborsQuery.AngularLsh(field, 1000), k),
Query(NearestNeighborsQuery.AngularLsh(field, 1300, useMLTQuery = true), k),
)
),
// Jaccard exact, sparse indexed, LSH
Experiment(
randomSparseBools,
Mapping.SparseBool(randomSparseBools.dims),
NearestNeighborsQuery.Exact(field, Similarity.Jaccard),
Mapping.JaccardLsh(randomSparseBools.dims, 400, 1),
Seq(
Query(NearestNeighborsQuery.JaccardLsh(field, 1000), k),
Query(NearestNeighborsQuery.JaccardLsh(field, 1300, useMLTQuery = true), k)
)
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,44 @@ import scala.util.hashing.MurmurHash3
object DatasetClient {

trait Service {
def streamTrain[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, V]
def streamTest[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, V]
def streamTrain(dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, Vec]
def streamTest(dataset: Dataset, limit: Option[Int] = None): Stream[Throwable, Vec]
}

/** Implementation of [[DatasetClient.Service]] that reads from an s3 bucket. */
/**
* Implementation of [[DatasetClient.Service]] that reads from an s3 bucket.
* Special case for random datasets.
* @return
*/
def s3(bucket: String, keyPrefix: String): ZLayer[Has[AmazonS3], Throwable, DatasetClient] = ZLayer.fromService[AmazonS3, Service] {
client =>
new Service {
private def stream[V <: Vec: ElasticsearchCodec](dataset: Dataset, name: String, limit: Option[Int]): Stream[Throwable, V] =
private def stream(dataset: Dataset, name: String, limit: Option[Int]): Stream[Throwable, Vec] =
dataset match {
case r: RandomSparseBool =>
implicit val rng: Random = new Random(MurmurHash3.orderedHash(Seq(r.dims, name)))
Stream
.range(0, if (name == "train") r.count else r.count / 10)
.map(_ => Vec.SparseBool.random(r.dims))
.map(ElasticsearchCodec.encode(_).hcursor)
.map(ElasticsearchCodec.decode[V](_))
.mapM(ZIO.fromEither(_))
.range(0, if (name == "train") r.train else r.test)
.map(_ => Vec.SparseBool.random(r.dims, r.bias))
case r: RandomDenseFloat =>
implicit val rng: Random = new Random(MurmurHash3.orderedHash(Seq(r.dims, name)))
Stream
.range(0, if (name == "train") r.count else r.count / 10)
.range(0, if (name == "train") r.train else r.test)
.map(_ => Vec.DenseFloat.random(r.dims))
.map(ElasticsearchCodec.encode(_).hcursor)
.map(ElasticsearchCodec.decode[V](_))
.mapM(ZIO.fromEither(_))
case _ =>
def parseDecode(s: String): Either[circe.Error, V] =
ElasticsearchCodec.parse(s).flatMap(j => ElasticsearchCodec.decode[V](j.hcursor))
def parseDecode(s: String): Either[circe.Error, Vec] =
ElasticsearchCodec.parse(s).flatMap(j => ElasticsearchCodec.decode[Vec](j.hcursor))
val obj = client.getObject(bucket, s"$keyPrefix/${dataset.name}/${name}.json.gz")
val iterManaged = Managed.makeEffect(Source.fromInputStream(new GZIPInputStream(obj.getObjectContent)))(_.close())
val lines = Stream.fromIteratorManaged(iterManaged.map(src => limit.map(n => src.getLines.take(n)).getOrElse(src.getLines())))
val rawJson = lines.map(_.dropWhile(_ != '{'))
rawJson.mapM(s => ZIO.fromEither(parseDecode(s)))
}

override def streamTrain[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int]): Stream[Throwable, V] =
override def streamTrain(dataset: Dataset, limit: Option[Int]): Stream[Throwable, Vec] =
stream(dataset, "train", limit)

override def streamTest[V <: Vec: ElasticsearchCodec](dataset: Dataset, limit: Option[Int]): Stream[Throwable, V] =
override def streamTest(dataset: Dataset, limit: Option[Int]): Stream[Throwable, Vec] =
stream(dataset, "test", limit)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object Execute extends App {
parallelism: Int): ZIO[Logging with Clock with DatasetClient with ElastiknnZioClient, Throwable, BenchmarkResult] = {

// Index name is a function of dataset, mapping and holdout so we can check if it already exists and avoid re-indexing.
val trainIndexName = s"ix-${dataset.name}-${MurmurHash3.orderedHash(Seq(dataset, eknnMapping))}"
val trainIndexName = s"ix-${dataset.name}-${MurmurHash3.orderedHash(Seq(dataset, eknnMapping))}".toLowerCase
val testIndexName = s"$trainIndexName-test"

// Create a primary and holdout index with same mappings.
Expand All @@ -87,15 +87,15 @@ object Execute extends App {
_ <- eknnClient.execute(createIndex(testIndexName).replicas(0).shards(parallelism).indexSetting("refresh_interval", "-1"))
datasets <- ZIO.access[DatasetClient](_.get)
_ <- log.info(s"Indexing vectors for dataset $dataset")
_ <- datasets.streamTrain[Vec](dataset).grouped(chunkSize).zipWithIndex.foreach {
_ <- datasets.streamTrain(dataset).grouped(chunkSize).zipWithIndex.foreach {
case (vecs, batchIndex) =>
val ids = Some(vecs.indices.map(i => s"$batchIndex-$i"))
for {
(dur, _) <- eknnClient.index(trainIndexName, eknnQuery.field, vecs, ids = ids).timed
_ <- log.debug(s"Indexed batch $batchIndex to $trainIndexName in ${dur.toMillis} ms")
} yield ()
}
_ <- datasets.streamTest[Vec](dataset).grouped(chunkSize).zipWithIndex.foreach {
_ <- datasets.streamTest(dataset).grouped(chunkSize).zipWithIndex.foreach {
case (vecs, batchIndex) =>
val ids = Some(vecs.indices.map(i => s"$batchIndex-$i"))
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object ResultClient {
@tailrec
def readAllKeys(req: ListObjectsV2Request, agg: Vector[String] = Vector.empty): Vector[String] = {
val res = client.listObjectsV2(req)
val keys = res.getObjectSummaries.asScala.toVector.map(_.getKey)
val keys = res.getObjectSummaries.asScala.toVector.map(_.getKey).filter(_.endsWith(".json"))
if (res.isTruncated) readAllKeys(req.withContinuationToken(res.getNextContinuationToken), agg ++ keys)
else agg ++ keys
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package object benchmarks {
type ElastiknnZioClient = Has[ElastiknnZioClient.Service]

sealed abstract class Dataset(val dims: Int) {
final def name: String = this.getClass.getSimpleName.toLowerCase.replace("$", "")
def name: String = this.getClass.getSimpleName.replace("$", "")
}
object Dataset {
case object AmazonHome extends Dataset(4096)
Expand All @@ -36,8 +36,13 @@ package object benchmarks {
case object AnnbMnist extends Dataset(784)
case object AnnbNyt extends Dataset(256)
case object AnnbSift extends Dataset(128)
case class RandomDenseFloat(override val dims: Int = 1024, count: Int = 10000) extends Dataset(dims)
case class RandomSparseBool(override val dims: Int = 4096, count: Int = 10000) extends Dataset(dims)
case class RandomDenseFloat(override val dims: Int = 1024, train: Int = 50000, test: Int = 1000) extends Dataset(dims) {
override def name: String = s"Random${dims}d${train / 1000}K${test / 1000}K"
}
case class RandomSparseBool(override val dims: Int = 4096, train: Int = 50000, test: Int = 1000, bias: Double = 0.25)
extends Dataset(dims) {
override def name: String = s"Random${dims}d${train / 1000}K${test / 1000}K"
}
}

final case class Query(nnq: NearestNeighborsQuery, k: Int)
Expand Down Expand Up @@ -91,23 +96,25 @@ package object benchmarks {
"query"
)

private def mappingToAlgorithmName(m: Mapping): String = m match {
case _: SparseBool => s"Exact"
case _: DenseFloat => "exact"
case _: SparseIndexed => "sparse indexed"
case _: JaccardLsh | _: HammingLsh | _: AngularLsh | _: L2Lsh => "lsh"
private def algorithmName(m: Mapping, q: NearestNeighborsQuery): String = m match {
case _: SparseBool => s"Exact"
case _: DenseFloat => "Exact"
case _: SparseIndexed => "Sparse indexed"
case _: JaccardLsh | _: HammingLsh | _: AngularLsh | _: L2Lsh =>
q match {
case lsh: NearestNeighborsQuery.LshQuery if lsh.useMLTQuery => "LSH w/ MLT"
case _ => "LSH"
}
}

private def queryToSimilarityName(q: NearestNeighborsQuery): String = q.similarity.toString

def apply(benchmarkResult: BenchmarkResult): AggregateResult = {
val ptile = new Percentile()
val recalls = benchmarkResult.queryResults.map(_.recall).toArray
val durations = benchmarkResult.queryResults.map(_.duration.toDouble).toArray
new AggregateResult(
benchmarkResult.dataset.name,
queryToSimilarityName(benchmarkResult.query),
mappingToAlgorithmName(benchmarkResult.mapping),
benchmarkResult.query.similarity.toString,
algorithmName(benchmarkResult.mapping, benchmarkResult.query),
benchmarkResult.k,
ptile.evaluate(recalls, 0.1).toFloat,
ptile.evaluate(durations, 0.1).toFloat,
Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
- Added an option for LSH queries to use the more-like-this heuristics to pick a subset of LSH hashes to retrieve candidate vectors.
Uses Lucene's [MoreLikeThis class](https://lucene.apache.org/core/8_5_0/queries/org/apache/lucene/queries/mlt/MoreLikeThis.html)
to pick a subset of hashes based on index statistics. It's generally much faster than using _all_ of the hashes,
yields comparable recall, but is still disabled by default.
---
- Using ConstantScoreQuery to wrap the TermQuery's used for matching hashes in Elastiknn's SparseIndexQuery and LshQuery.
Improves the SparseIndexedQuery benchmark from ~66 seconds to ~48 seconds.
Improves the LshQuery benchmark from ~37 seconds to ~31 seconds.
Expand Down
29 changes: 13 additions & 16 deletions core/src/main/scala/com/klibisz/elastiknn/api/package.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.klibisz.elastiknn

import jdk.jfr.Experimental

import scala.util.Random

package object api {
Expand Down Expand Up @@ -107,37 +109,32 @@ package object api {
def apply(field: String, similarity: Similarity): SparseIndexed = SparseIndexed(field, Vec.Empty(), similarity)
}

final case class JaccardLsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery {
sealed trait LshQuery extends NearestNeighborsQuery {
def candidates: Int

@Experimental
def useMLTQuery: Boolean
}

final case class JaccardLsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery {
override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v)
override def similarity: Similarity = Similarity.Jaccard
}
object JaccardLsh {
def apply(field: String, candidates: Int): JaccardLsh = JaccardLsh(field, Vec.Empty(), candidates)
}

final case class HammingLsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery {
final case class HammingLsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery {
override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v)
override def similarity: Similarity = Similarity.Hamming
}
object HammingLsh {
def apply(field: String, candidates: Int): HammingLsh = HammingLsh(field, Vec.Empty(), candidates)
}

final case class AngularLsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery {
final case class AngularLsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery {
override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v)
override def similarity: Similarity = Similarity.Angular
}
object AngularLsh {
def apply(field: String, candidates: Int): AngularLsh = AngularLsh(field, Vec.Empty(), candidates)
}

final case class L2Lsh(field: String, vec: Vec, candidates: Int) extends NearestNeighborsQuery {
final case class L2Lsh(field: String, candidates: Int, vec: Vec = Vec.Empty(), useMLTQuery: Boolean = false) extends LshQuery {
override def withVec(v: Vec): NearestNeighborsQuery = copy(vec = v)
override def similarity: Similarity = Similarity.L2
}
object L2Lsh {
def apply(field: String, candidates: Int): L2Lsh = L2Lsh(field, Vec.Empty(), candidates)
}

}
}
Loading

0 comments on commit c951cf5

Please sign in to comment.