From ac08eabca7386dbff1f62a3dcbfc9f4ab4d860aa Mon Sep 17 00:00:00 2001 From: Alex Klibisz Date: Tue, 16 Jun 2020 21:40:13 -0400 Subject: [PATCH] Perf improvements based on benchmarking (without all of the benchmarking code) (#75) Some internal changes based on a lot of benchmarking work. Deleted the benchmarking code from this branch and will merge it as a separate PR. See changelog entry. --- .esopts | 3 +- .github/workflows/ci.yml | 2 +- .github/workflows/release.yml | 2 +- .gitignore | 2 + .minio/.keep | 0 .scalafmt.conf | 2 +- Makefile | 11 +- changelog.md | 7 + .../elastiknn/client/ElastiknnClient.scala | 27 +--- core/build.gradle | 4 +- .../storage/UnsafeSerialization.java | 102 ++++++++++++++ .../klibisz/elastiknn/utils/ArrayUtils.java | 32 ++--- .../elastiknn/api/ElasticsearchCodec.scala | 8 +- .../com/klibisz/elastiknn/api/package.scala | 6 +- .../models/ExactSimilarityFunction.scala | 33 ++--- .../elastiknn/models/LshFunction.scala | 27 ++-- .../klibisz/elastiknn/storage/StoredVec.scala | 77 ++++++++++ docs/pages/api.md | 54 ++++--- es74x/build.gradle | 91 ------------ es74x/gradle.properties | 1 - es74x/src/main/proto/elastiknn/storage.proto | 17 --- .../klibisz/elastiknn/query/ExactQuery.scala | 72 ---------- .../elastiknn/storage/ByteArrayCodec.scala | 51 ------- .../klibisz/elastiknn/storage/VecCache.scala | 34 ----- examples/demo/docker-compose.yml | 2 +- gradle.properties | 2 - gradle/wrapper/gradle-wrapper.properties | 3 +- {es74x => plugin}/Dockerfile | 4 +- plugin/build.gradle | 47 +++++++ plugin/gradle.properties | 1 + .../gradle/wrapper/gradle-wrapper.jar | Bin .../gradle/wrapper/gradle-wrapper.properties | 0 {es74x => plugin}/settings.gradle | 0 .../java/com/klibisz/elastiknn/Empty.java | 0 .../plugin-metadata/plugin-security.policy | 7 + .../klibisz/elastiknn/ElastiKnnPlugin.scala | 0 .../elastiknn/mapper/VectorMapper.scala | 14 +- .../klibisz/elastiknn/query/ExactQuery.scala | 62 ++++++++ .../elastiknn/query/KnnQueryBuilder.scala | 21 ++- .../elastiknn/query/LshFunctionCache.scala | 29 ++-- .../klibisz/elastiknn/query/LshQuery.scala | 62 ++++---- .../elastiknn/query/RewriteQueryBuilder.scala | 0 .../elastiknn/query/SparseIndexedQuery.scala | 8 +- .../elastiknn/ElastiKnnJavaClusterIT.java | 0 .../klibisz/elastiknn/ElastiKnnRestIT.java | 0 .../test/elastiknn/01_plugin_installed.yml | 0 .../test/elastiknn/20_basic_2.yml | 0 reference/build.gradle | 10 +- .../SerializationBenchmark.scala | 132 ++++++++++++++++++ settings.gradle | 3 +- testing/build.gradle | 1 - testing/docker-compose.yml | 6 +- .../com/klibisz/elastiknn/ClusterSpec.scala | 3 +- .../api/ElasticsearchCodecSuite.scala | 3 +- .../elastiknn/mapper/VectorMapperSuite.scala | 6 +- .../storage/UnsafeSerializationSuite.scala | 87 ++++++++++++ 56 files changed, 727 insertions(+), 451 deletions(-) create mode 100644 .minio/.keep create mode 100644 core/src/main/java/com/klibisz/elastiknn/storage/UnsafeSerialization.java create mode 100644 core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala delete mode 100644 es74x/build.gradle delete mode 100644 es74x/gradle.properties delete mode 100644 es74x/src/main/proto/elastiknn/storage.proto delete mode 100644 es74x/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala delete mode 100644 es74x/src/main/scala/com/klibisz/elastiknn/storage/ByteArrayCodec.scala delete mode 100644 es74x/src/main/scala/com/klibisz/elastiknn/storage/VecCache.scala rename {es74x => plugin}/Dockerfile (50%) create mode 100644 plugin/build.gradle create mode 100644 plugin/gradle.properties rename {es74x => plugin}/gradle/wrapper/gradle-wrapper.jar (100%) rename {es74x => plugin}/gradle/wrapper/gradle-wrapper.properties (100%) rename {es74x => plugin}/settings.gradle (100%) rename {es74x => plugin}/src/main/java/com/klibisz/elastiknn/Empty.java (100%) create mode 100644 plugin/src/main/plugin-metadata/plugin-security.policy rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala (100%) rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala (94%) create mode 100644 plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala (89%) rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala (50%) rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala (56%) rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/query/RewriteQueryBuilder.scala (100%) rename {es74x => plugin}/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala (91%) rename {es74x => plugin}/src/test/java/com/klibisz/elastiknn/ElastiKnnJavaClusterIT.java (100%) rename {es74x => plugin}/src/test/java/com/klibisz/elastiknn/ElastiKnnRestIT.java (100%) rename {es74x => plugin}/src/test/resources/rest-api-spec/test/elastiknn/01_plugin_installed.yml (100%) rename {es74x => plugin}/src/test/resources/rest-api-spec/test/elastiknn/20_basic_2.yml (100%) create mode 100644 reference/src/main/scala/com/klibisz/elastiknn/reference/serialization/SerializationBenchmark.scala create mode 100644 testing/src/test/scala/com/klibisz/elastiknn/storage/UnsafeSerializationSuite.scala diff --git a/.esopts b/.esopts index c408f2c0d..abb53e1bc 100644 --- a/.esopts +++ b/.esopts @@ -1,3 +1,2 @@ -Dtests.heap.size=4G --Dtests.es.thread_pool.write.size=4 --Dtests.es.thread_pool.search.size=4 +-Dtests.es.indices.query.bool.max_clause_count=4096 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 427fe02b1..721320f10 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: - name: Setup Java uses: actions/setup-java@v1 with: - java-version: 12.0.2 + java-version: 14 - name: Setup Ruby uses: actions/setup-ruby@v1 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d967b84a3..bdbe7ff73 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -28,7 +28,7 @@ jobs: - name: Setup Java uses: actions/setup-java@v1 with: - java-version: 12.0.2 + java-version: 14 - name: Setup Ruby uses: actions/setup-ruby@v1 diff --git a/.gitignore b/.gitignore index 064cf0971..893e65c9c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ build target release.md *.csv +.minio/* +!.minio/.keep # Ignore Gradle GUI config gradle-app.setting diff --git a/.minio/.keep b/.minio/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/.scalafmt.conf b/.scalafmt.conf index a0193ecdb..70cb1870c 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,2 +1,2 @@ -version = "2.0.0-RC6" +version = "2.5.2" maxColumn = 140 diff --git a/Makefile b/Makefile index b1350cac8..9bc01434d 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ core = $(pwd)/core vpip = ./venv/bin/pip vpy = ./venv/bin/python gradle = ./gradlew -eslatest = es74x +eslatest = plugin s3 = aws s3 build_bucket = s3://com-klibisz-elastiknn-builds/ dc = docker-compose @@ -76,17 +76,17 @@ run/cluster: .mk/run-cluster run/gradle: cd testing && $(dc) down - $(gradle) run $(shell cat .esopts | xargs) + $(gradle) :plugin:run $(shell cat .esopts | xargs) run/debug: cd testing && $(dc) down - $(gradle) run $(shell cat .esopts | xargs) --debug-jvm + $(gradle) :plugin:run $(shell cat .esopts | xargs) --debug-jvm run/kibana: - docker run --network host -e ELASTICSEARCH_HOSTS=http://localhost:9200 -p 5601:5601 -d --rm kibana:7.4.0 + docker run --network host -e ELASTICSEARCH_HOSTS=http://localhost:9200 -p 5601:5601 -d --rm kibana:7.6.2 docker ps | grep kibana -run/demo/app: .mk/gradle-publish-local .mk/example-demo-sbt-docker-stage .mk/example-demo-sbt-docker-stage .mk/vm-max-map-count +run/demo: .mk/gradle-publish-local .mk/example-demo-sbt-docker-stage .mk/example-demo-sbt-docker-stage .mk/vm-max-map-count cd examples/demo && \ PLAY_HTTP_SECRET_KEY=$(shell sha256sum ~/.ssh/id_rsa | cut -d' ' -f1) $(dc) up --build --detach @@ -161,4 +161,3 @@ publish/docs: .mk/gradle-docs .mk/client-python-docs publish/site: .mk/jekyll-site-build mkdir -p docs/_site/docs rsync -av --delete --exclude docs docs/_site/ $(site_srvr):$(site_main) - diff --git a/changelog.md b/changelog.md index 233dd5804..c366531a0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,10 @@ +- Removed the internal vector caching and instead using `sun.misc.Unsafe` to speed up vector serialization and deserialization. + The result is actually faster queries _without_ caching than it previously had _with_ caching. + Also able to remove the protobuf dependency which was previously used to serialize vectors. +- Upgraded Elasticsearch version from 7.4.0 to 7.6.2. + Attempted to use 7.7.1 but the Apache Lucene version used in 7.7.x introduces a performance regression ([Details](https://issues.apache.org/jira/browse/LUCENE-9378)). + Switching from Java 13 to 14 also yields a slight speedup for intersections on sparse vectors. +--- - Internal change from custom Lucene queries to FunctionScoreQueries. This reduces quite a bit of boilerplate code and surface area for bugs and performance regressions. - Add optional progress bar to Python ElastiknnModel. diff --git a/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala b/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala index e47e3c7d0..555e7e050 100644 --- a/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala +++ b/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala @@ -25,11 +25,7 @@ trait ElastiknnClient[F[_]] extends AutoCloseable { def putMapping(index: String, field: String, mapping: Mapping): F[Response[PutMappingResponse]] = execute(ElastiknnRequests.putMapping(index, field, mapping)) - def index(index: String, - field: String, - vecs: Seq[Vec], - ids: Option[Seq[String]] = None, - refresh: RefreshPolicy = RefreshPolicy.NONE): F[Response[BulkResponse]] = { + def index(index: String, field: String, vecs: Seq[Vec], ids: Option[Seq[String]] = None, refresh: RefreshPolicy = RefreshPolicy.NONE): F[Response[BulkResponse]] = { val reqs = vecs.map(v => ElastiknnRequests.indexVec(index, field, v)) val withIds = ids match { case Some(idSeq) if idSeq.length == reqs.length => @@ -48,8 +44,9 @@ trait ElastiknnClient[F[_]] extends AutoCloseable { object ElastiknnClient { - def futureClient(host: String = "localhost", port: Int = 9200, strictFailure: Boolean = true)( - implicit ec: ExecutionContext): ElastiknnFutureClient = { + final case class StrictFailureException(message: String, cause: Throwable = None.orNull) extends RuntimeException(message, cause) + + def futureClient(host: String = "localhost", port: Int = 9200, strictFailure: Boolean = true)(implicit ec: ExecutionContext): ElastiknnFutureClient = { val rc: RestClient = RestClient.builder(new HttpHost(host, port)).build() val jc: JavaClient = new JavaClient(rc) new ElastiknnFutureClient { @@ -78,27 +75,17 @@ object ElastiknnClient { else bulkResponseItems.head.error match { case Some(err) => - Some( - ElasticError(err.`type`, - err.reason, - Some(err.index_uuid), - Some(err.index), - Some(err.shard.toString), - Seq.empty, - None, - None, - None, - Seq.empty)) + Some(ElasticError(err.`type`, err.reason, Some(err.index_uuid), Some(err.index), Some(err.shard.toString), Seq.empty, None, None, None, Seq.empty)) case None => findBulkError(bulkResponseItems.tail, acc) } if (res.isError) Left(res.error.asException) - else if (res.status != 200) Left(new RuntimeException(s"Returned non-200 response: [$res]")) + else if (res.status != 200) Left(StrictFailureException(s"Returned non-200 response: [$res]")) else res.result match { case bulkResponse: BulkResponse if bulkResponse.hasFailures => findBulkError(bulkResponse.items) match { case Some(err) => Left(err.asException) - case None => Left(new RuntimeException(s"Unknown bulk execution error in response $res")) + case None => Left(StrictFailureException(s"Unknown bulk execution error in response $res")) } case other => Right(other) } diff --git a/core/build.gradle b/core/build.gradle index f574feb0b..5114f09aa 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -15,8 +15,8 @@ plugins { dependencies { runtime "org.scala-lang:scala-library:${scalaVersion}" implementation "org.scala-lang:scala-library:${scalaVersion}" - implementation "io.circe:circe-generic_${scalaVersion}:${circeVersion}" - implementation "io.circe:circe-parser_${scalaVersion}:${circeVersion}" + compile "io.circe:circe-generic_${scalaVersion}:${circeVersion}" + compile "io.circe:circe-parser_${scalaVersion}:${circeVersion}" compile "io.circe:circe-generic-extras_${scalaVersion}:${circeVersion}" } diff --git a/core/src/main/java/com/klibisz/elastiknn/storage/UnsafeSerialization.java b/core/src/main/java/com/klibisz/elastiknn/storage/UnsafeSerialization.java new file mode 100644 index 000000000..b22cf0ca4 --- /dev/null +++ b/core/src/main/java/com/klibisz/elastiknn/storage/UnsafeSerialization.java @@ -0,0 +1,102 @@ +package com.klibisz.elastiknn.storage; + +import sun.misc.Unsafe; + +import java.lang.reflect.Field; +import java.security.AccessController; +import java.security.PrivilegedAction; + +/** + * Uses the sun.misc.Unsafe classes to serialize int and float arrays for optimal performance based on benchmarking. + * This is largely a simplification of the UnsafeInput and UnsafeOutput classes from the Kryo library. + */ +public class UnsafeSerialization { + + public static final int numBytesInInt = 4; + public static final int numBytesInFloat = 4; + + private static final UnsafeUtil u = AccessController.doPrivileged((PrivilegedAction) () -> { + try { + return new UnsafeUtil(); + } catch (Exception ex) { + throw new RuntimeException("Failed to initialize UnsafeSerialization", ex); + } + }); + + public static byte[] writeInt(final int i) { + final int a = Math.abs(i); + if (a <= Byte.MAX_VALUE) { + final byte[] buf = new byte[1]; + u.unsafe.putInt(buf, u.byteArrayOffset, i); + return buf; + } else if (a <= Short.MAX_VALUE) { + final byte[] buf = new byte[2]; + u.unsafe.putInt(buf, u.byteArrayOffset, i); + return buf; + } else { + final byte[] buf = new byte[4]; + u.unsafe.putInt(buf, u.byteArrayOffset, i); + return buf; + } + } + + public static int readInt(final byte[] barr) { + return u.unsafe.getInt(barr, u.byteArrayOffset); + } + + /** + * Writes ints to a byte array. + * @param iarr ints to serialize. + * @return Array of bytes with length (4 * iarr.length). + */ + public static byte[] writeInts(final int[] iarr) { + final int bytesLen = iarr.length * numBytesInInt; + byte[] buf = new byte[bytesLen]; + u.unsafe.copyMemory(iarr, u.intArrayOffset, buf, u.byteArrayOffset, bytesLen); + return buf; + } + + /** + * Reads ints from a byte array. + */ + public static int[] readInts(final byte[] barr, final int offset, final int length) { + final int[] iarr = new int[length / numBytesInInt]; + u.unsafe.copyMemory(barr, offset + u.byteArrayOffset, iarr, u.intArrayOffset, length); + return iarr; + } + + /** + * Writes floats to a byte array. + */ + public static byte[] writeFloats(final float[] farr) { + final int bytesLen = farr.length * numBytesInFloat; + final byte[] buf = new byte[bytesLen]; + u.unsafe.copyMemory(farr, u.floatArrayOffset, buf, u.byteArrayOffset, bytesLen); + return buf; + } + + /** + * Reads floats from a byte array. + */ + public static float[] readFloats(final byte[] barr, final int offset, final int length) { + final float[] farr = new float[length / numBytesInFloat]; + u.unsafe.copyMemory(barr, offset + u.byteArrayOffset, farr, u.floatArrayOffset, length); + return farr; + } + + private static class UnsafeUtil { + public final Unsafe unsafe; + public final long intArrayOffset; + public final long floatArrayOffset; + public final long byteArrayOffset; + public UnsafeUtil() throws NoSuchFieldException, IllegalAccessException { + final Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); + intArrayOffset = unsafe.arrayBaseOffset(int[].class); + floatArrayOffset = unsafe.arrayBaseOffset(float[].class); + byteArrayOffset = unsafe.arrayBaseOffset(byte[].class); + } + } + +} diff --git a/core/src/main/java/com/klibisz/elastiknn/utils/ArrayUtils.java b/core/src/main/java/com/klibisz/elastiknn/utils/ArrayUtils.java index 5821fd370..f72b31de9 100644 --- a/core/src/main/java/com/klibisz/elastiknn/utils/ArrayUtils.java +++ b/core/src/main/java/com/klibisz/elastiknn/utils/ArrayUtils.java @@ -1,27 +1,27 @@ package com.klibisz.elastiknn.utils; /** - * Java implementations of some particularly performance-critical code paths. + * Java implementations of some hot spots. */ public class ArrayUtils { - private static void unsortedException(int lit, int big) { - throw new IllegalArgumentException(String.format("Called on unsorted array: %d came after %d", lit, big)); - } - - public static int sortedIntersectionCount(int [] xs, int [] ys) { + /** + * Compute the number of intersecting (i.e. identical) elements between two int arrays. + * IMPORTANT: Assumes the given arrays are already sorted in ascending order and _does not_ check if this is true. + * If the given arrays are not sorted, the answer will be wrong. + * This is implemented in Java because for some reason Scala will Box the ints in some cases which is unnecessary + * and far slower. + * @param xs + * @param ys + * @return The number of identical elements in the two arrays. For example {1,2,3}, {2,3,4} would return 2. + */ + public static int sortedIntersectionCount(final int [] xs, final int [] ys) { int n = 0; int xi = 0; int yi = 0; - int xmax = Integer.MIN_VALUE; - int ymax = Integer.MIN_VALUE; while (xi < xs.length && yi < ys.length) { int x = xs[xi]; int y = ys[yi]; - if (x < xmax) unsortedException(x, xmax); - else xmax = x; - if (y < ymax) unsortedException(y, ymax); - else ymax = y; if (x < y) xi += 1; else if (x > y) yi += 1; else { @@ -30,14 +30,6 @@ public static int sortedIntersectionCount(int [] xs, int [] ys) { yi += 1; } } - while(xi < xs.length) { - if (xs[xi] < xmax) unsortedException(xs[xi], xmax); - xi += 1; - } - while(yi < ys.length) { - if (ys[yi] < ymax) unsortedException(ys[yi], ymax); - yi += 1; - } return n; } diff --git a/core/src/main/scala/com/klibisz/elastiknn/api/ElasticsearchCodec.scala b/core/src/main/scala/com/klibisz/elastiknn/api/ElasticsearchCodec.scala index 949644974..a6192580b 100644 --- a/core/src/main/scala/com/klibisz/elastiknn/api/ElasticsearchCodec.scala +++ b/core/src/main/scala/com/klibisz/elastiknn/api/ElasticsearchCodec.scala @@ -114,15 +114,21 @@ object ElasticsearchCodec { esc => implicit val cfg: Configuration = Configuration.default.withSnakeCaseMemberNames ElasticsearchCodec(deriveConfiguredCodec) } + implicit val emptyVec: ESC[Vec.Empty] = { + implicit val cfg: Configuration = Configuration.default.withStrictDecoding + ElasticsearchCodec(deriveConfiguredCodec) + } implicit val vector: ESC[api.Vec] = new ESC[api.Vec] { override def apply(t: Vec): Json = t match { case ixv: Vec.Indexed => encode(ixv) case sbv: Vec.SparseBool => encode(sbv) case dfv: Vec.DenseFloat => encode(dfv) + case emp: Vec.Empty => encode(emp) } + // TODO: Compare performance of .orElse to alternatives that just check for specific json keys. override def apply(c: HCursor): Either[DecodingFailure, Vec] = - denseFloatVector(c).orElse(sparseBoolVector(c)).orElse(indexedVector(c)) + sparseBoolVector(c).orElse(denseFloatVector(c)).orElse(indexedVector(c)).orElse(emptyVec(c)) } implicit val mappingSparseBool: ESC[Mapping.SparseBool] = ElasticsearchCodec(deriveCodec) diff --git a/core/src/main/scala/com/klibisz/elastiknn/api/package.scala b/core/src/main/scala/com/klibisz/elastiknn/api/package.scala index eaf701035..cc7db1c77 100644 --- a/core/src/main/scala/com/klibisz/elastiknn/api/package.scala +++ b/core/src/main/scala/com/klibisz/elastiknn/api/package.scala @@ -26,7 +26,7 @@ package object api { case other: SparseBool => trueIndices.deep == other.trueIndices.deep && totalIndices == other.totalIndices case _ => false } - override def toString: String = s"SparseBool(${trueIndices.take(3).mkString(",")},...,$totalIndices)" + override def toString: String = s"SparseBool(${trueIndices.take(3).mkString(",")},...,${trueIndices.length}/$totalIndices)" } object SparseBool { @@ -57,7 +57,6 @@ package object api { } dp } - } object DenseFloat { @@ -69,6 +68,9 @@ package object api { } final case class Indexed(index: String, id: String, field: String) extends Vec + + private[elastiknn] final case class Empty() extends Vec + } sealed trait Mapping { diff --git a/core/src/main/scala/com/klibisz/elastiknn/models/ExactSimilarityFunction.scala b/core/src/main/scala/com/klibisz/elastiknn/models/ExactSimilarityFunction.scala index 313b62b18..e7ca1fd8e 100644 --- a/core/src/main/scala/com/klibisz/elastiknn/models/ExactSimilarityFunction.scala +++ b/core/src/main/scala/com/klibisz/elastiknn/models/ExactSimilarityFunction.scala @@ -1,7 +1,8 @@ package com.klibisz.elastiknn.models import com.klibisz.elastiknn.api.{Similarity, Vec} -import com.klibisz.elastiknn.utils.ArrayUtils.sortedIntersectionCount +import com.klibisz.elastiknn.storage.StoredVec +import com.klibisz.elastiknn.utils.ArrayUtils._ /** * You can always compute distance between two vectors, but similarity is not always well-defined. @@ -14,28 +15,28 @@ import com.klibisz.elastiknn.utils.ArrayUtils.sortedIntersectionCount */ final case class ExactSimilarityScore(score: Double, distance: Double) -sealed trait ExactSimilarityFunction[V <: Vec] extends ((V, V) => Double) { +sealed trait ExactSimilarityFunction[V <: Vec, S <: StoredVec] extends ((V, S) => Double) { def similarity: Similarity override def equals(other: Any): Boolean = other match { - case f: ExactSimilarityFunction[V] => f.similarity == similarity - case _ => false + case f: ExactSimilarityFunction[V, S] => f.similarity == similarity + case _ => false } } object ExactSimilarityFunction { - object Jaccard extends ExactSimilarityFunction[Vec.SparseBool] { + object Jaccard extends ExactSimilarityFunction[Vec.SparseBool, StoredVec.SparseBool] { override def similarity: Similarity = Similarity.Jaccard - override def apply(v1: Vec.SparseBool, v2: Vec.SparseBool): Double = { + override def apply(v1: Vec.SparseBool, v2: StoredVec.SparseBool): Double = { val isec = sortedIntersectionCount(v1.trueIndices, v2.trueIndices) val sim = isec * 1.0 / (v1.trueIndices.length + v2.trueIndices.length - isec) sim } } - object Hamming extends ExactSimilarityFunction[Vec.SparseBool] { + object Hamming extends ExactSimilarityFunction[Vec.SparseBool, StoredVec.SparseBool] { override def similarity: Similarity = Similarity.Hamming - override def apply(v1: Vec.SparseBool, v2: Vec.SparseBool): Double = { + override def apply(v1: Vec.SparseBool, v2: StoredVec.SparseBool): Double = { val eqTrueCount = sortedIntersectionCount(v1.trueIndices, v2.trueIndices) val totalCount = v1.totalIndices val v1TrueCount = v1.trueIndices.length @@ -45,9 +46,9 @@ object ExactSimilarityFunction { sim } } - object L1 extends ExactSimilarityFunction[Vec.DenseFloat] { + object L1 extends ExactSimilarityFunction[Vec.DenseFloat, StoredVec.DenseFloat] { override def similarity: Similarity = Similarity.L1 - override def apply(v1: Vec.DenseFloat, v2: Vec.DenseFloat): Double = { + override def apply(v1: Vec.DenseFloat, v2: StoredVec.DenseFloat): Double = { var sumAbsDiff: Double = 0.0 var i = 0 while (i < v1.values.length) { @@ -57,9 +58,9 @@ object ExactSimilarityFunction { 1.0 / sumAbsDiff.max(1e-6) } } - object L2 extends ExactSimilarityFunction[Vec.DenseFloat] { + object L2 extends ExactSimilarityFunction[Vec.DenseFloat, StoredVec.DenseFloat] { override def similarity: Similarity = Similarity.L2 - override def apply(v1: Vec.DenseFloat, v2: Vec.DenseFloat): Double = { + override def apply(v1: Vec.DenseFloat, v2: StoredVec.DenseFloat): Double = { var sumSqrDiff: Double = 0.0 var i = 0 while (i < v1.values.length) { @@ -71,12 +72,12 @@ object ExactSimilarityFunction { 1.0 / dist.max(1e-6) } } - object Angular extends ExactSimilarityFunction[Vec.DenseFloat] { + object Angular extends ExactSimilarityFunction[Vec.DenseFloat, StoredVec.DenseFloat] { override def similarity: Similarity = Similarity.Angular - override def apply(v1: Vec.DenseFloat, v2: Vec.DenseFloat): Double = { + override def apply(v1: Vec.DenseFloat, v2: StoredVec.DenseFloat): Double = { var dotProd: Double = 0 - var v1SqrSum: Double = 0 - var v2SqrSum: Double = 0 + var v1SqrSum: Double = 1e-16 // Prevent NaNs. + var v2SqrSum: Double = 1e-16 var i = 0 while (i < v1.values.length) { dotProd += v1.values(i) * v2.values(i) diff --git a/core/src/main/scala/com/klibisz/elastiknn/models/LshFunction.scala b/core/src/main/scala/com/klibisz/elastiknn/models/LshFunction.scala index ea62bae9b..98c6181ba 100644 --- a/core/src/main/scala/com/klibisz/elastiknn/models/LshFunction.scala +++ b/core/src/main/scala/com/klibisz/elastiknn/models/LshFunction.scala @@ -1,12 +1,14 @@ package com.klibisz.elastiknn.models import com.klibisz.elastiknn.api.{Mapping, Vec} +import com.klibisz.elastiknn.storage +import com.klibisz.elastiknn.storage.StoredVec import scala.util.Random -sealed trait LshFunction[M <: Mapping, V <: Vec] extends (V => Array[Int]) { +sealed trait LshFunction[M <: Mapping, V <: Vec, S <: StoredVec] extends (V => Array[Int]) { val mapping: M - val exact: ExactSimilarityFunction[V] + val exact: ExactSimilarityFunction[V, S] } object LshFunction { @@ -29,9 +31,10 @@ object LshFunction { * bands: number of bands, each containing `rows` hash functions. Generally, more bands yield higher recall. * rows: number of rows in each band. Generally, more rows yield higher precision. */ - final class Jaccard(override val mapping: Mapping.JaccardLsh) extends LshFunction[Mapping.JaccardLsh, Vec.SparseBool] { + final class Jaccard(override val mapping: Mapping.JaccardLsh) + extends LshFunction[Mapping.JaccardLsh, Vec.SparseBool, StoredVec.SparseBool] { - override val exact: ExactSimilarityFunction[Vec.SparseBool] = ExactSimilarityFunction.Jaccard + override val exact: ExactSimilarityFunction[Vec.SparseBool, StoredVec.SparseBool] = ExactSimilarityFunction.Jaccard import mapping._ private val rng: Random = new Random(0) @@ -75,8 +78,9 @@ object LshFunction { * @param mapping HammingLsh Mapping. The members are used as follows: * bits: determines the number of randomly sampled indices. */ - final class Hamming(override val mapping: Mapping.HammingLsh) extends LshFunction[Mapping.HammingLsh, Vec.SparseBool] { - override val exact: ExactSimilarityFunction[Vec.SparseBool] = ExactSimilarityFunction.Hamming + final class Hamming(override val mapping: Mapping.HammingLsh) + extends LshFunction[Mapping.HammingLsh, Vec.SparseBool, StoredVec.SparseBool] { + override val exact: ExactSimilarityFunction[Vec.SparseBool, StoredVec.SparseBool] = ExactSimilarityFunction.Hamming import mapping._ private val rng: Random = new Random(0) @@ -123,8 +127,9 @@ object LshFunction { * bands: number of bands, each containing `rows` hash functions. Generally, more bands yield higher recall. * rows: number of rows per band. Generally, more rows yield higher precision. */ - final class Angular(override val mapping: Mapping.AngularLsh) extends LshFunction[Mapping.AngularLsh, Vec.DenseFloat] { - override val exact: ExactSimilarityFunction[Vec.DenseFloat] = ExactSimilarityFunction.Angular + final class Angular(override val mapping: Mapping.AngularLsh) + extends LshFunction[Mapping.AngularLsh, Vec.DenseFloat, StoredVec.DenseFloat] { + override val exact: ExactSimilarityFunction[Vec.DenseFloat, StoredVec.DenseFloat] = ExactSimilarityFunction.Angular import mapping._ private implicit val rng: Random = new Random(0) @@ -164,12 +169,12 @@ object LshFunction { * bands: number of bands, each containing `rows` hash functions. Generally, more bands yield higher recall. * Note that this often referred to as `L`, or the number of hash tables. * rows: number of rows per band. Generally, more rows yield higher precision. - * Note that this is oten called `k`, or the number of functions per hash table. + * Note that this is often called `k`, or the number of functions per hash table. * width: width of the interval that determines two floating-point hashed values are equivalent. * */ - final class L2(override val mapping: Mapping.L2Lsh) extends LshFunction[Mapping.L2Lsh, Vec.DenseFloat] { - override val exact: ExactSimilarityFunction[Vec.DenseFloat] = ExactSimilarityFunction.L2 + final class L2(override val mapping: Mapping.L2Lsh) extends LshFunction[Mapping.L2Lsh, Vec.DenseFloat, StoredVec.DenseFloat] { + override val exact: ExactSimilarityFunction[Vec.DenseFloat, StoredVec.DenseFloat] = ExactSimilarityFunction.L2 import mapping._ private implicit val rng: Random = new Random(0) diff --git a/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala b/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala new file mode 100644 index 000000000..17a2b0194 --- /dev/null +++ b/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala @@ -0,0 +1,77 @@ +package com.klibisz.elastiknn.storage + +import com.klibisz.elastiknn.api.Vec + +/** + * Abstraction for different vector storage layouts and typeclasses for encoding/decoding them. + * This is decoupled from the api Vec case classes so we can support various optimizations that might change the + * interface, e.g. streaming the vectors in a read-once fashion. Currently the fastest storage methods support roughly + * the same interface. + * + * The current default serialization method is using sun.misc.Unsafe to eek out the best possible performance. + * The implementation is based mostly on the Kryo library's use of sun.misc.Unsafe. + * Many other options were considered: + * - fast-serialization library with unsafe configuration - roughly same as using Unsafe. + * - kryo library with unsafe input/output - a bit slower than fast-serialization and bare Unsafe. + * - java.io.ObjectOutput/InputStreams - 30-40% slower than Unsafe, but by far the best vanilla JVM solution. + * - protocol buffers - roughly same as ObjectOutput/InputStreams, but with smaller byte array sizes; the size + * doesn't seem to matter as it's compressed by ES anyway. + * - java.io.DataOutput/InputStreams - far slower. + * - scodec - far slower. + * + * Anything using Unsafe comes with the tradeoff that it requires extra JVM security permissions. + * If this becomes a problem we should likely switch to ObjectOutput/InputStreams. + */ +sealed trait StoredVec + +object StoredVec { + + sealed trait SparseBool extends StoredVec { + val trueIndices: Array[Int] + } + + sealed trait DenseFloat extends StoredVec { + val values: Array[Float] + } + + /** + * Typeclasses for converting api vecs to stored vecs. + */ + trait Codec[V <: Vec, S <: StoredVec] { + def decode(barr: Array[Byte], offset: Int, length: Int): S + def encode(vec: V): Array[Byte] + } + + object Codec { + implicit def derived[V <: Vec: Encoder, S <: StoredVec: Decoder]: Codec[V, S] = + new Codec[V, S] { + override def decode(barr: Array[Byte], offset: Int, length: Int): S = implicitly[Decoder[S]].apply(barr, offset, length) + override def encode(vec: V): Array[Byte] = implicitly[Encoder[V]].apply(vec) + } + } + + trait Decoder[S <: StoredVec] { + def apply(barr: Array[Byte], offset: Int, length: Int): S + } + + object Decoder { + implicit val sparseBool: Decoder[SparseBool] = (barr: Array[Byte], offset: Int, length: Int) => + new SparseBool { + override val trueIndices: Array[Int] = UnsafeSerialization.readInts(barr, offset, length) + } + implicit val denseFloat: Decoder[DenseFloat] = (barr: Array[Byte], offset: Int, length: Int) => + new DenseFloat { + override val values: Array[Float] = UnsafeSerialization.readFloats(barr, offset, length) + } + } + + trait Encoder[V <: Vec] { + def apply(vec: V): Array[Byte] + } + + object Encoder { + implicit val sparseBool: Encoder[Vec.SparseBool] = (vec: Vec.SparseBool) => UnsafeSerialization.writeInts(vec.trueIndices) + implicit val denseFloat: Encoder[Vec.DenseFloat] = (vec: Vec.DenseFloat) => UnsafeSerialization.writeFloats(vec.values) + } + +} diff --git a/docs/pages/api.md b/docs/pages/api.md index 432719d6c..7decb531e 100644 --- a/docs/pages/api.md +++ b/docs/pages/api.md @@ -18,7 +18,8 @@ Once you've [installed Elastiknn](/installation/), you can use the REST API just ## Mappings -Before indexing vectors you must define a mapping specifying one of two vector datatypes and a few other properties. These determine how vectors are indexed to support different kinds of searches. +Before indexing vectors you must define a mapping specifying one of two vector datatypes and a few other properties. +These determine how vectors are indexed to support different kinds of searches. The general structure for specifying a mapping looks like this: @@ -50,7 +51,9 @@ PUT /my-index/_mapping ### elastiknn_sparse_bool_vector Datatype -This type is optimized for vectors where each index is either `true` or `false` and the majority of indices are `false`. For example, you might represent a bag-of-words encoding of a document, where each index corresponds to a word in a vocabulary and any single document contains a very small fraction of all words. Internally, Elastiknn saves space by only storing a list of the true indices. +This type is optimized for vectors where each index is either `true` or `false` and the majority of indices are `false`. +For example, you might represent a bag-of-words encoding of a document, where each index corresponds to a word in a vocabulary and any single document contains a very small fraction of all words. +Internally, Elastiknn saves space by only storing the true indices. ```json PUT /my-index/_mapping @@ -75,7 +78,9 @@ PUT /my-index/_mapping ### elastiknn_dense_float_vector Datatype -This type is optimized for vectors where each index is a floating point number, all of the indices are populated, and the dimensionality usually doesn't exceed ~1000. For example, you might store a word embedding or an image vector. Internally, Elastiknn uses Java Floats to store the values. +This type is optimized for vectors where each index is a floating point number, all of the indices are populated, and the dimensionality usually doesn't exceed ~1000. +For example, you might store a word embedding or an image vector. +Internally, Elastiknn uses Java Floats to store the values. ```json PUT /my-index/_mapping @@ -100,7 +105,8 @@ PUT /my-index/_mapping ### Exact Mapping -The exact model will allow you to run exact searches. These don't levarage any indexing constructs and have `O(n^2)` runtime, where `n` is the total number of documents. +The exact model will allow you to run exact searches. +These don't leverage any indexing constructs and have `O(n^2)` runtime, where `n` is the total number of documents. You don't need to supply any `"model": "..."` value or any model parameters to use this model. @@ -125,7 +131,9 @@ PUT /my-index/_mapping ### Sparse Indexed Mapping -The sparse indexed model introduces an obvious optimization for exact queries on sparse bool vectors. It indexes each of of true indices as a Lucene term, basically treating them like [Elasticsearch keywords](https://www.elastic.co/guide/en/elasticsearch/reference/current/keyword.html). Jaccard and Hamming similarity both require computing the intersection of the query vector against all indexed vectors, and indexing the true indices makes this operation much more efficient. However, you must consider that there is an upper bound on the number of possible terms in a term query, [see the `index.max_terms_count` setting.](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html#index-max-terms-count) If the number of true indices in your vectors exceeds this limit, you'll have to adjust it or you'll encounter failed queries. +The sparse indexed model introduces an obvious optimization for exact queries on sparse bool vectors. +It indexes each of the true indices as a Lucene term, basically treating them like [Elasticsearch keywords](https://www.elastic.co/guide/en/elasticsearch/reference/current/keyword.html). Jaccard and Hamming similarity both require computing the intersection of the query vector against all indexed vectors, and indexing the true indices makes this operation much more efficient. However, you must consider that there is an upper bound on the number of possible terms in a term query, [see the `index.max_terms_count` setting.](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html#index-max-terms-count) +If the number of true indices in your vectors exceeds this limit, you'll have to adjust it or you'll encounter failed queries. ```json PUT /my-index/_mapping @@ -630,28 +638,40 @@ The tables below show valid model/query combinations. Rows are models and column ## Miscellaneous Implementation Details -Here are some other things worth knowing. Perhaps there will be a more cohesive way to present these in the future. +Here are some other things worth knowing. +Perhaps there will be a more cohesive way to present these in the future. ### Storing Model Parameters -The LSH models all use randomized parameters to hash vectors. The simplest example is the bit-sampling model for Hamming similarity, which is parameterized by a list of randomly sampled indices. A more complicated example is the stable distributions model for L2 similarity, which is parameterized by a set of random unit vectors and a set of random bias values. These parameters aren't actually stored anywhere in Elasticsearch. Rather, they are lazily re-computed from a fixed random seed each time they are needed. The advantage of this is that you don't have to worry about storing and synchronizing potentially large parameter documents somewhere in the cluster. The disadvantage is that it's expensive to re-compute the randomized parameters. So instead we keep an internal cache of models, keyed on the model hyperparameters (e.g. `bands`, `rows`, etc.). The hyperparameters are stored inside the mappings where they are originally defined. +The LSH models all use randomized parameters to hash vectors. +The simplest example is the bit-sampling model for Hamming similarity, which is parameterized by a list of randomly sampled indices. +A more complicated example is the stable distributions model for L2 similarity, which is parameterized by a set of random unit vectors and a set of random bias values. +These parameters aren't actually stored anywhere in Elasticsearch. +Rather, they are lazily re-computed from a fixed random seed each time they are needed. +The advantage of this is that you don't have to worry about storing and synchronizing potentially large parameter documents somewhere in the cluster. +The disadvantage is that it's expensive to re-compute the randomized parameters. +So instead we keep an internal cache of models, keyed on the model hyperparameters (e.g. `bands`, `rows`, etc.). +The hyperparameters are stored inside the mappings where they are originally defined. ### Transforming and Indexing Vectors -Each vector is transformed (e.g. hashed) based on its mapping when the user makes an indexing request. All vectors index a binary [doc values field](https://www.elastic.co/guide/en/elasticsearch/reference/current/doc-values.html) containing a serialized version of the vector, as well as term fields based on the vector's mapping. For example, for a sparse bool vector with a Jaccard LSH mapping, Elastiknn indexes the exact vector as a byte array in a doc values field and the vector's hash values as a set of Lucene Terms which point back to the document and field containing the vector. All of this transformation is part of the implementation for the `elastiknn_sparse_bool_vector` and `elastiknn_dense_float_vector` datatypes. +Each vector is transformed (e.g. hashed) based on its mapping when the user makes an indexing request. +All vectors index a binary [doc values field](https://www.elastic.co/guide/en/elasticsearch/reference/current/doc-values.html) containing a serialized version of the vector, as well as term fields based on the vector's mapping. +For example, for a sparse bool vector with a Jaccard LSH mapping, Elastiknn indexes the exact vector as a byte array in a doc values field and the vector's hash values as a set of Lucene Terms which point back to the document and field containing the vector. +All of this transformation is part of the implementation for the `elastiknn_sparse_bool_vector` and `elastiknn_dense_float_vector` datatypes. ### Caching Mappings -When a user submits an `elastiknn_nearest_neighbors` query, Elastiknn has to retrieve the mapping for the indexed vector field in order to validate and hash the query vector. Mappings are typically static, so Elastiknn keeps an in-memory cache of mappings with a one minute expiration to avoid repeatedly requesting an unchanged mapping for every query. This cache is local to each Elasticsearch node. +When a user submits an `elastiknn_nearest_neighbors` query, Elastiknn has to retrieve the mapping for the indexed vector field in order to validate and hash the query vector. +Mappings are typically static, so Elastiknn keeps an in-memory cache of mappings with a one-minute expiration to avoid repeatedly requesting an unchanged mapping for every query. +This cache is local to each Elasticsearch node. -The practical implication is that if you intend to delete and re-create an index with different Elastiknn mappings, you should wait more than 60 seconds between deleting and running new queries. In reality it usually takes much longer than one minute to delete, re-create, and populate an index. - -### Caching Vectors - -To compute exact similarities, Elastiknn has to retrieve the serialized version of each indexed vector, deserialize it, and instantiate a new object. Elasticsearch documents are typically static, especially while running high volumes of queries. So Elastiknn keeps an in-memory cache of deserialized vectors with a one minute expiration on each Elasticsearch node. - -The practical implication is that you should wait more than one minute between updating vector contents and running queries that might access the modified vectors. +The practical implication is that if you intend to delete and re-create an index with different Elastiknn mappings, you should wait more than 60 seconds between deleting and running new queries. +In reality it usually takes much longer than one minute to delete, re-create, and populate an index. ### Parallelism -From Elasticsearch's perspective, the `elastiknn_nearest_neighbors` query is no different than any other query. Elasticsearch receives a JSON query containing an `elastiknn_nearest_neighbors` key, passes the JSON to a parser implemented by Elastiknn, the parser produces a Lucene query, and Elasticsearch executes that query on each shard in the index. This means the simplest way to increase query parallelism is to add shards to your index. Obviously this has an upper limit, but the general performance implications of sharding are beyond the scope of this document. +From Elasticsearch's perspective, the `elastiknn_nearest_neighbors` query is no different than any other query. +Elasticsearch receives a JSON query containing an `elastiknn_nearest_neighbors` key, passes the JSON to a parser implemented by Elastiknn, the parser produces a Lucene query, and Elasticsearch executes that query on each shard in the index. +This means the simplest way to increase query parallelism is to add shards to your index. +Obviously this has an upper limit, but the general performance implications of sharding are beyond the scope of this document. diff --git a/es74x/build.gradle b/es74x/build.gradle deleted file mode 100644 index d10c49e5d..000000000 --- a/es74x/build.gradle +++ /dev/null @@ -1,91 +0,0 @@ - -buildscript { - repositories { - mavenLocal() - mavenCentral() - jcenter() - } - dependencies { - classpath "org.elasticsearch.gradle:build-tools:${esVersion}" - classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8' - } -} - -plugins { - id 'com.google.protobuf' version '0.8.8' -} - -version = "${version}_es${esVersion}" - -apply plugin: 'elasticsearch.esplugin' - -licenseFile = rootProject.file("LICENSE.txt") -noticeFile = rootProject.file("NOTICE.txt") - -dependencyLicenses.enabled = false -thirdPartyAudit.enabled = false -licenseHeaders.enabled = false -testingConventions.enabled = false - -configurations { - all { - resolutionStrategy.preferProjectModules() - } -} - -dependencies { - runtime project(':core') - implementation project(':core') - runtime "org.scala-lang:scala-library:${scalaVersion}" - implementation "org.scala-lang:scala-library:${scalaVersion}" - runtime "com.thesamet.scalapb:scalapb-runtime_${scalaVersion}:${scalapbVersion}" - implementation "com.thesamet.scalapb:scalapb-runtime_${scalaVersion}:${scalapbVersion}" - implementation "com.google.guava:guava:28.1-jre" - runtime "com.google.guava:guava:28.1-jre" -} - -protobuf { - protoc { - artifact = 'com.google.protobuf:protoc:3.10.0' - } - plugins { - scalapb { - artifact = (org.gradle.nativeplatform.platform.internal.DefaultNativePlatform.getCurrentOperatingSystem().isWindows()) ? - "com.thesamet.scalapb:protoc-gen-scala:${scalapbVersion}:windows@bat" : - "com.thesamet.scalapb:protoc-gen-scala:${scalapbVersion}:unix@sh" - } - } - - generateProtoTasks { - all().each { task -> - task.builtins { - remove java - } - task.plugins { - scalapb { - // add any ScalaPB generator options here. See: https://scalapb.github.io/scalapbc.html#passing-generator-parameters - option 'flat_package' - option 'no_lenses' - } - } - } - } -} - -// Add geneated Scala code as a source directory -sourceSets { - main { - scala { - srcDirs "${protobuf.generatedFilesBaseDir}/main/scalapb" - } - } -} - -esplugin { - name 'elastiknn' - description 'Ingest processor and queries for exact and approximate nearest neighbors search' - classname 'com.klibisz.elastiknn.ElastiKnnPlugin' - extendedPlugins = ['lang-painless'] - licenseFile rootProject.file('LICENSE.txt') - noticeFile rootProject.file('NOTICE.txt') -} \ No newline at end of file diff --git a/es74x/gradle.properties b/es74x/gradle.properties deleted file mode 100644 index 7d5cc6b62..000000000 --- a/es74x/gradle.properties +++ /dev/null @@ -1 +0,0 @@ -esVersion = 7.4.0 \ No newline at end of file diff --git a/es74x/src/main/proto/elastiknn/storage.proto b/es74x/src/main/proto/elastiknn/storage.proto deleted file mode 100644 index 4386b3f9e..000000000 --- a/es74x/src/main/proto/elastiknn/storage.proto +++ /dev/null @@ -1,17 +0,0 @@ -syntax = "proto3"; - -package klibisz.elastiknn.storage; - -option java_package = "com.klibisz.elastiknn.storage"; - -import "scalapb/scalapb.proto"; - -// Protobuf objects for serializing vectors in the elasticsearch index. -message SparseBoolVector { - repeated uint32 true_indices = 1 [(scalapb.field).collection_type="Array"]; - uint32 total_indices = 2; -} - -message DenseFloatVector { - repeated float values = 1 [(scalapb.field).collection_type="Array"]; -} diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala b/es74x/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala deleted file mode 100644 index 84238d49c..000000000 --- a/es74x/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala +++ /dev/null @@ -1,72 +0,0 @@ -package com.klibisz.elastiknn.query - -import java.util.Objects - -import com.klibisz.elastiknn.ELASTIKNN_NAME -import com.klibisz.elastiknn.api.{ElasticsearchCodec, Vec} -import com.klibisz.elastiknn.models.ExactSimilarityFunction -import com.klibisz.elastiknn.storage.ByteArrayCodec -import com.klibisz.elastiknn.storage.VecCache.ContextCache -import org.apache.lucene.document.BinaryDocValuesField -import org.apache.lucene.index.{IndexableField, LeafReaderContext} -import org.apache.lucene.search.{DocValuesFieldExistsQuery, Explanation} -import org.apache.lucene.util.BytesRef -import org.elasticsearch.common.lucene.search.function._ - -object ExactQuery { - - private class ExactScoreFunction[V <: Vec: ByteArrayCodec: ElasticsearchCodec](val field: String, - val queryVec: V, - val simFunc: ExactSimilarityFunction[V], - val ctxCache: ContextCache[V]) - extends ScoreFunction(CombineFunction.REPLACE) { - - override def getLeafScoreFunction(ctx: LeafReaderContext): LeafScoreFunction = { - val vecDocVals = ctx.reader.getBinaryDocValues(vectorDocValuesField(field)) - val docIdCache = ctxCache.get(ctx) - new LeafScoreFunction { - override def score(docId: Int, subQueryScore: Float): Double = { - val storedVec = docIdCache.get( - docId, - () => { - if (vecDocVals.advanceExact(docId)) { - val binaryValue = vecDocVals.binaryValue() - val vecBytes = binaryValue.bytes.take(binaryValue.length) - implicitly[ByteArrayCodec[V]].apply(vecBytes).get - } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") - } - ) - simFunc(queryVec, storedVec).toFloat - } - - override def explainScore(docId: Int, subQueryScore: Explanation): Explanation = - Explanation.`match`(100, "Computing exact similarity scores for a query vector against _all_ indexed vectors.") - } - } - - override def needsScores(): Boolean = false // TODO: maybe it does? - - override def doEquals(other: ScoreFunction): Boolean = other match { - case f: ExactScoreFunction[V] => field == f.field && queryVec == f.queryVec && simFunc == f.simFunc && ctxCache == f.ctxCache - case _ => false - } - - override def doHashCode(): Int = Objects.hash(field, queryVec, simFunc, ctxCache) - } - - def apply[V <: Vec: ByteArrayCodec: ElasticsearchCodec](field: String, - queryVec: V, - simFunc: ExactSimilarityFunction[V], - cache: ContextCache[V]): FunctionScoreQuery = { - val subQuery = new DocValuesFieldExistsQuery(vectorDocValuesField(field)) - new FunctionScoreQuery(subQuery, new ExactScoreFunction(field, queryVec, simFunc, cache)) - } - - // Docvalue fields can have a custom name, but "regular" values (e.g. Terms) must keep the name of the field. - def vectorDocValuesField(field: String): String = s"$field.$ELASTIKNN_NAME.vector" - - def index[V <: Vec: ByteArrayCodec](field: String, vec: V): Seq[IndexableField] = { - Seq(new BinaryDocValuesField(vectorDocValuesField(field), new BytesRef(implicitly[ByteArrayCodec[V]].apply(vec)))) - } - -} diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/storage/ByteArrayCodec.scala b/es74x/src/main/scala/com/klibisz/elastiknn/storage/ByteArrayCodec.scala deleted file mode 100644 index c90541d16..000000000 --- a/es74x/src/main/scala/com/klibisz/elastiknn/storage/ByteArrayCodec.scala +++ /dev/null @@ -1,51 +0,0 @@ -package com.klibisz.elastiknn.storage - -import com.google.protobuf.wrappers.Int32Value -import com.klibisz.elastiknn.api.Vec -import scalapb.descriptors.{PInt, PRepeated} - -import scala.util.Try - -// Codec for writing/reading objects to/from byte arrays. Mostly used for storing values. -trait ByteArrayCodec[T] { - def apply(t: T): Array[Byte] - def apply(a: Array[Byte]): Try[T] -} - -object ByteArrayCodec { - - def encode[T: ByteArrayCodec](t: T): Array[Byte] = implicitly[ByteArrayCodec[T]].apply(t) - def decode[T: ByteArrayCodec](a: Array[Byte]): Try[T] = implicitly[ByteArrayCodec[T]].apply(a) - - implicit val sparseBoolVector: ByteArrayCodec[Vec.SparseBool] = new ByteArrayCodec[Vec.SparseBool] { - override def apply(t: Vec.SparseBool): Array[Byte] = SparseBoolVector(t.trueIndices, t.totalIndices).toByteArray - override def apply(a: Array[Byte]): Try[Vec.SparseBool] = Try { - val stored = SparseBoolVector.parseFrom(a) - Vec.SparseBool(stored.trueIndices, stored.totalIndices) - } - } - - implicit val denseFloatVector: ByteArrayCodec[Vec.DenseFloat] = new ByteArrayCodec[Vec.DenseFloat] { - override def apply(t: Vec.DenseFloat): Array[Byte] = { - val dfv = DenseFloatVector(t.values) - val barr = dfv.toByteArray - barr - } - override def apply(a: Array[Byte]): Try[Vec.DenseFloat] = Try { - val stored = DenseFloatVector.parseFrom(a) - Vec.DenseFloat(stored.values) - } - } - - implicit val int: ByteArrayCodec[Int] = new ByteArrayCodec[Int] { - override def apply(t: Int): Array[Byte] = { - // Another way to do it. Not sure which is faster, but using the protobufs is easier to decode. - // val buf = java.nio.ByteBuffer.allocate(4) - // buf.putInt(t) - // buf.array() - Int32Value(t).toByteArray - } - override def apply(a: Array[Byte]): Try[Int] = Try(Int32Value.parseFrom(a).value) - } - -} diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/storage/VecCache.scala b/es74x/src/main/scala/com/klibisz/elastiknn/storage/VecCache.scala deleted file mode 100644 index 86bf214c3..000000000 --- a/es74x/src/main/scala/com/klibisz/elastiknn/storage/VecCache.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.klibisz.elastiknn.storage - -import java.util.concurrent.TimeUnit - -import com.google.common.cache.{Cache, CacheBuilder, CacheLoader, LoadingCache} -import com.klibisz.elastiknn.api.Vec -import org.apache.lucene.index.LeafReaderContext - -private[elastiknn] object VecCache { - - type DocIdCache[V <: Vec] = Cache[Integer, V] - type ContextCache[V <: Vec] = LoadingCache[LeafReaderContext, DocIdCache[V]] - type IndexFieldCache[V <: Vec] = LoadingCache[(String, String), ContextCache[V]] - - private def build[V <: Vec]: IndexFieldCache[V] = - CacheBuilder.newBuilder - .expireAfterWrite(1, TimeUnit.MINUTES) - .build(new CacheLoader[(String, String), ContextCache[V]] { - override def load(key: (String, String)): LoadingCache[LeafReaderContext, DocIdCache[V]] = - CacheBuilder.newBuilder - .expireAfterWrite(1, TimeUnit.MINUTES) - .build(new CacheLoader[LeafReaderContext, DocIdCache[V]] { - override def load(key: LeafReaderContext): DocIdCache[V] = - CacheBuilder.newBuilder.expireAfterWrite(1, TimeUnit.MINUTES).build[Integer, V] - }) - }) - - private val sbv: IndexFieldCache[Vec.SparseBool] = build[Vec.SparseBool] - private val dfv: IndexFieldCache[Vec.DenseFloat] = build[Vec.DenseFloat] - - def SparseBool(index: String, field: String): ContextCache[Vec.SparseBool] = sbv.get((index, field)) - def DenseFloat(index: String, field: String): ContextCache[Vec.DenseFloat] = dfv.get((index, field)) - -} diff --git a/examples/demo/docker-compose.yml b/examples/demo/docker-compose.yml index 7b79ff336..a324e1cc4 100644 --- a/examples/demo/docker-compose.yml +++ b/examples/demo/docker-compose.yml @@ -20,7 +20,7 @@ services: # Single elasticsearch container. elasticsearch: build: - context: ${ES_CONTEXT:-../../es74x} + context: ${ES_CONTEXT:-../../plugin} dockerfile: Dockerfile container_name: elasticsearch ports: diff --git a/gradle.properties b/gradle.properties index 355d4f58c..8e8652013 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,6 +2,4 @@ pluginName=elastiknn group=com.klibisz.elastiknn scalaVersion=2.12 -scalapbVersion=0.9.6 -scalapbCirceVersion=0.6.0 circeVersion=0.13.0 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e2016e8a8..43f6317a1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,4 @@ -#Sun Dec 08 19:16:48 EST 2019 -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/es74x/Dockerfile b/plugin/Dockerfile similarity index 50% rename from es74x/Dockerfile rename to plugin/Dockerfile index 412354f4d..97065d4dc 100644 --- a/es74x/Dockerfile +++ b/plugin/Dockerfile @@ -1,4 +1,6 @@ -FROM docker.elastic.co/elasticsearch/elasticsearch:7.4.0 +FROM docker.elastic.co/elasticsearch/elasticsearch:7.6.2 +RUN yum -y install epel-release +RUN yum -y install htop COPY build/distributions/*.zip . RUN elasticsearch-plugin install -b file:$(ls elastiknn*zip | sort | tail -n1) diff --git a/plugin/build.gradle b/plugin/build.gradle new file mode 100644 index 000000000..43a4e1a9e --- /dev/null +++ b/plugin/build.gradle @@ -0,0 +1,47 @@ + +buildscript { + repositories { + mavenLocal() + mavenCentral() + jcenter() + } + dependencies { + classpath "org.elasticsearch.gradle:build-tools:${esVersion}" + } +} + +version = "${version}_es${esVersion}" + +apply plugin: 'elasticsearch.esplugin' + +licenseFile = rootProject.file("LICENSE.txt") +noticeFile = rootProject.file("NOTICE.txt") + +dependencyLicenses.enabled = false +thirdPartyAudit.enabled = false +licenseHeaders.enabled = false +testingConventions.enabled = false +licenseHeaders.enabled = false + +configurations { + all { + resolutionStrategy.preferProjectModules() + } +} + +dependencies { + runtime project(':core') + implementation project(':core') + runtime "org.scala-lang:scala-library:${scalaVersion}" + implementation "org.scala-lang:scala-library:${scalaVersion}" + implementation "com.google.guava:guava:28.1-jre" + runtime "com.google.guava:guava:28.1-jre" +} + +esplugin { + name 'elastiknn' + description 'Ingest processor and queries for exact and approximate nearest neighbors search' + classname 'com.klibisz.elastiknn.ElastiKnnPlugin' + licenseFile rootProject.file('LICENSE.txt') + noticeFile rootProject.file('NOTICE.txt') +} diff --git a/plugin/gradle.properties b/plugin/gradle.properties new file mode 100644 index 000000000..5c9a1af74 --- /dev/null +++ b/plugin/gradle.properties @@ -0,0 +1 @@ +esVersion = 7.6.2 diff --git a/es74x/gradle/wrapper/gradle-wrapper.jar b/plugin/gradle/wrapper/gradle-wrapper.jar similarity index 100% rename from es74x/gradle/wrapper/gradle-wrapper.jar rename to plugin/gradle/wrapper/gradle-wrapper.jar diff --git a/es74x/gradle/wrapper/gradle-wrapper.properties b/plugin/gradle/wrapper/gradle-wrapper.properties similarity index 100% rename from es74x/gradle/wrapper/gradle-wrapper.properties rename to plugin/gradle/wrapper/gradle-wrapper.properties diff --git a/es74x/settings.gradle b/plugin/settings.gradle similarity index 100% rename from es74x/settings.gradle rename to plugin/settings.gradle diff --git a/es74x/src/main/java/com/klibisz/elastiknn/Empty.java b/plugin/src/main/java/com/klibisz/elastiknn/Empty.java similarity index 100% rename from es74x/src/main/java/com/klibisz/elastiknn/Empty.java rename to plugin/src/main/java/com/klibisz/elastiknn/Empty.java diff --git a/plugin/src/main/plugin-metadata/plugin-security.policy b/plugin/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 000000000..8c88d7163 --- /dev/null +++ b/plugin/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,7 @@ +grant { + permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect"; + permission java.lang.RuntimePermission "reflectionFactoryAccess"; +}; diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala b/plugin/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala similarity index 100% rename from es74x/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala b/plugin/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala similarity index 94% rename from es74x/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala index 07654dbdb..bfa9af668 100644 --- a/es74x/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/mapper/VectorMapper.scala @@ -8,9 +8,10 @@ import com.klibisz.elastiknn.query.{ExactQuery, LshQuery, SparseIndexedQuery} import com.klibisz.elastiknn.{ELASTIKNN_NAME, VectorDimensionException} import io.circe.syntax._ import io.circe.{Json, JsonObject} -import org.apache.lucene.index.IndexableField +import org.apache.lucene.index.{IndexableField, Term} import org.apache.lucene.search.similarities.BooleanSimilarity -import org.apache.lucene.search.{DocValuesFieldExistsQuery, Query} +import org.apache.lucene.search.{DocValuesFieldExistsQuery, Query, TermInSetQuery, TermQuery} +import org.apache.lucene.util.BytesRef import org.elasticsearch.common.xcontent.{ToXContent, XContentBuilder} import org.elasticsearch.index.mapper.Mapper.TypeParser import org.elasticsearch.index.mapper._ @@ -146,8 +147,13 @@ abstract class VectorMapper[V <: Vec: ElasticsearchCodec] { self => override def typeName(): String = CONTENT_TYPE override def clone(): FieldType = new FieldType - override def termQuery(value: Any, context: QueryShardContext): Query = - throw new UnsupportedOperationException(s"Field [${name()}] of type [${typeName()}] doesn't support queries") + override def termQuery(value: Any, context: QueryShardContext): Query = value match { + case b: BytesRef => new TermQuery(new Term(name(), b)) + case _ => + throw new UnsupportedOperationException( + s"Field [${name()}] of type [${typeName()}] doesn't support term queries with value of type [${value.getClass}]") + } + override def existsQuery(context: QueryShardContext): Query = new DocValuesFieldExistsQuery(name()) } diff --git a/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala new file mode 100644 index 000000000..36964fbd3 --- /dev/null +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala @@ -0,0 +1,62 @@ +package com.klibisz.elastiknn.query + +import java.util.Objects + +import com.klibisz.elastiknn.ELASTIKNN_NAME +import com.klibisz.elastiknn.api.Vec +import com.klibisz.elastiknn.models.ExactSimilarityFunction +import com.klibisz.elastiknn.storage.StoredVec +import org.apache.lucene.document.BinaryDocValuesField +import org.apache.lucene.index.{IndexableField, LeafReaderContext} +import org.apache.lucene.search.{DocValuesFieldExistsQuery, Explanation} +import org.apache.lucene.util.BytesRef +import org.elasticsearch.common.lucene.search.function._ + +object ExactQuery { + + private class ExactScoreFunction[V <: Vec, S <: StoredVec](val field: String, val queryVec: V, val simFunc: ExactSimilarityFunction[V, S])( + implicit codec: StoredVec.Codec[V, S]) + extends ScoreFunction(CombineFunction.REPLACE) { + + override def getLeafScoreFunction(ctx: LeafReaderContext): LeafScoreFunction = { + val vecDocVals = ctx.reader.getBinaryDocValues(vectorDocValuesField(field)) + new LeafScoreFunction { + override def score(docId: Int, subQueryScore: Float): Double = + if (vecDocVals.advanceExact(docId)) { + val binVal = vecDocVals.binaryValue() + val storedVec = codec.decode(binVal.bytes, binVal.offset, binVal.length) + simFunc(queryVec, storedVec) + } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") + + override def explainScore(docId: Int, subQueryScore: Explanation): Explanation = { + Explanation.`match`(100, s"Elastiknn exact query") + } + } + } + + override def needsScores(): Boolean = false + + override def doEquals(other: ScoreFunction): Boolean = other match { + case f: ExactScoreFunction[V, S] => field == f.field && queryVec == f.queryVec && simFunc == f.simFunc + case _ => false + } + + override def doHashCode(): Int = Objects.hash(field, queryVec, simFunc) + } + + def apply[V <: Vec, S <: StoredVec](field: String, queryVec: V, simFunc: ExactSimilarityFunction[V, S])( + implicit codec: StoredVec.Codec[V, S]): FunctionScoreQuery = { + val subQuery = new DocValuesFieldExistsQuery(vectorDocValuesField(field)) + val func = new ExactScoreFunction(field, queryVec, simFunc) + new FunctionScoreQuery(subQuery, func) + } + + // Docvalue fields can have a custom name, but "regular" values (e.g. Terms) must keep the name of the field. + def vectorDocValuesField(field: String): String = s"$field.$ELASTIKNN_NAME.vector" + + def index[V <: Vec: StoredVec.Encoder](field: String, vec: V): Seq[IndexableField] = { + val bytes = implicitly[StoredVec.Encoder[V]].apply(vec) + Seq(new BinaryDocValuesField(vectorDocValuesField(field), new BytesRef(bytes))) + } + +} diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala similarity index 89% rename from es74x/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala index 93bea3a9b..807ed7d34 100644 --- a/es74x/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/KnnQueryBuilder.scala @@ -8,7 +8,7 @@ import com.google.common.io.BaseEncoding import com.klibisz.elastiknn.api.ElasticsearchCodec._ import com.klibisz.elastiknn.api._ import com.klibisz.elastiknn.models.{ExactSimilarityFunction, SparseIndexedSimilarityFunction} -import com.klibisz.elastiknn.storage.VecCache +import com.klibisz.elastiknn.storage.StoredVec import com.klibisz.elastiknn.utils.CirceUtils.javaMapEncoder import com.klibisz.elastiknn.{ELASTIKNN_NAME, api} import io.circe.Json @@ -66,25 +66,24 @@ final case class KnnQueryBuilder(query: NearestNeighborsQuery) extends AbstractQ // Have to get the mapping inside doToQuery because only QueryShardContext defines the index name and a client to make requests. val mapping: Mapping = getMapping(c) import NearestNeighborsQuery._ - val index = c.index.getName (query, mapping) match { case (Exact(f, v: Vec.SparseBool, Similarity.Jaccard), _: Mapping.SparseBool | _: Mapping.SparseIndexed | _: Mapping.JaccardLsh | _: Mapping.HammingLsh) => - ExactQuery(f, v, ExactSimilarityFunction.Jaccard, VecCache.SparseBool(index, f)) + ExactQuery(f, v, ExactSimilarityFunction.Jaccard) case (Exact(f, v: Vec.SparseBool, Similarity.Hamming), _: Mapping.SparseBool | _: Mapping.SparseIndexed | _: Mapping.JaccardLsh | _: Mapping.HammingLsh) => - ExactQuery(f, v, ExactSimilarityFunction.Hamming, VecCache.SparseBool(index, f)) + ExactQuery(f, v, ExactSimilarityFunction.Hamming) case (Exact(f, v: Vec.DenseFloat, Similarity.L1), _: Mapping.DenseFloat | _: Mapping.AngularLsh | _: Mapping.L2Lsh) => - ExactQuery(f, v, ExactSimilarityFunction.L1, VecCache.DenseFloat(index, f)) + ExactQuery(f, v, ExactSimilarityFunction.L1) case (Exact(f, v: Vec.DenseFloat, Similarity.L2), _: Mapping.DenseFloat | _: Mapping.AngularLsh | _: Mapping.L2Lsh) => - ExactQuery(f, v, ExactSimilarityFunction.L2, VecCache.DenseFloat(index, f)) + ExactQuery(f, v, ExactSimilarityFunction.L2) case (Exact(f, v: Vec.DenseFloat, Similarity.Angular), _: Mapping.DenseFloat | _: Mapping.AngularLsh | _: Mapping.L2Lsh) => - ExactQuery(f, v, ExactSimilarityFunction.Angular, VecCache.DenseFloat(index, f)) + ExactQuery(f, v, ExactSimilarityFunction.Angular) case (SparseIndexed(f, sbv: Vec.SparseBool, Similarity.Jaccard), _: Mapping.SparseIndexed) => SparseIndexedQuery(f, sbv, SparseIndexedSimilarityFunction.Jaccard) @@ -93,16 +92,16 @@ final case class KnnQueryBuilder(query: NearestNeighborsQuery) extends AbstractQ SparseIndexedQuery(f, sbv, SparseIndexedSimilarityFunction.Hamming) case (JaccardLsh(f, v: Vec.SparseBool, candidates), m: Mapping.JaccardLsh) => - LshQuery(f, m, v, candidates, VecCache.SparseBool(index, f)) + LshQuery(f, m, v, candidates, LshFunctionCache.Jaccard) case (HammingLsh(f, v: Vec.SparseBool, candidates), m: Mapping.HammingLsh) => - LshQuery(f, m, v, candidates, VecCache.SparseBool(index, f)) + LshQuery(f, m, v, candidates, LshFunctionCache.Hamming) case (AngularLsh(f, v: Vec.DenseFloat, candidates), m: Mapping.AngularLsh) => - LshQuery(f, m, v, candidates, VecCache.DenseFloat(index, f)) + LshQuery(f, m, v, candidates, LshFunctionCache.Angular) case (L2Lsh(f, v: Vec.DenseFloat, candidates), m: Mapping.L2Lsh) => - LshQuery(f, m, v, candidates, VecCache.DenseFloat(index, f)) + LshQuery(f, m, v, candidates, LshFunctionCache.L2) case _ => throw incompatible(mapping, query) } diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala similarity index 50% rename from es74x/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala index 1aa06eb01..7d664e831 100644 --- a/es74x/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshFunctionCache.scala @@ -5,29 +5,30 @@ import java.time.Duration import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.klibisz.elastiknn.api.{Mapping, Vec} import com.klibisz.elastiknn.models.LshFunction +import com.klibisz.elastiknn.storage.StoredVec // The Lsh Functions tend to be expensive to instantiate (i.e. initializing hashing parameters), hence a cache. -sealed trait LshFunctionCache[M <: Mapping, V <: Vec] extends (M => LshFunction[M, V]) { self => - private val cache: LoadingCache[M, LshFunction[M, V]] = CacheBuilder.newBuilder +sealed trait LshFunctionCache[M <: Mapping, V <: Vec, S <: StoredVec] extends (M => LshFunction[M, V, S]) { self => + private val cache: LoadingCache[M, LshFunction[M, V, S]] = CacheBuilder.newBuilder .expireAfterWrite(Duration.ofSeconds(60)) - .build(new CacheLoader[M, LshFunction[M, V]] { - override def load(m: M): LshFunction[M, V] = self.load(m) + .build(new CacheLoader[M, LshFunction[M, V, S]] { + override def load(m: M): LshFunction[M, V, S] = self.load(m) }) - override final def apply(mapping: M): LshFunction[M, V] = cache.get(mapping) - protected def load(m: M): LshFunction[M, V] + override final def apply(mapping: M): LshFunction[M, V, S] = cache.get(mapping) + protected def load(m: M): LshFunction[M, V, S] } object LshFunctionCache { - implicit object Jaccard extends LshFunctionCache[Mapping.JaccardLsh, Vec.SparseBool] { - def load(m: Mapping.JaccardLsh): LshFunction[Mapping.JaccardLsh, Vec.SparseBool] = new LshFunction.Jaccard(m) + implicit object Jaccard extends LshFunctionCache[Mapping.JaccardLsh, Vec.SparseBool, StoredVec.SparseBool] { + def load(m: Mapping.JaccardLsh): LshFunction[Mapping.JaccardLsh, Vec.SparseBool, StoredVec.SparseBool] = new LshFunction.Jaccard(m) } - implicit object Hamming extends LshFunctionCache[Mapping.HammingLsh, Vec.SparseBool] { - def load(m: Mapping.HammingLsh): LshFunction[Mapping.HammingLsh, Vec.SparseBool] = new LshFunction.Hamming(m) + implicit object Hamming extends LshFunctionCache[Mapping.HammingLsh, Vec.SparseBool, StoredVec.SparseBool] { + def load(m: Mapping.HammingLsh): LshFunction[Mapping.HammingLsh, Vec.SparseBool, StoredVec.SparseBool] = new LshFunction.Hamming(m) } - implicit object Angular extends LshFunctionCache[Mapping.AngularLsh, Vec.DenseFloat] { - def load(m: Mapping.AngularLsh): LshFunction[Mapping.AngularLsh, Vec.DenseFloat] = new LshFunction.Angular(m) + implicit object Angular extends LshFunctionCache[Mapping.AngularLsh, Vec.DenseFloat, StoredVec.DenseFloat] { + def load(m: Mapping.AngularLsh): LshFunction[Mapping.AngularLsh, Vec.DenseFloat, StoredVec.DenseFloat] = new LshFunction.Angular(m) } - implicit object L2 extends LshFunctionCache[Mapping.L2Lsh, Vec.DenseFloat] { - def load(m: Mapping.L2Lsh): LshFunction[Mapping.L2Lsh, Vec.DenseFloat] = new LshFunction.L2(m) + implicit object L2 extends LshFunctionCache[Mapping.L2Lsh, Vec.DenseFloat, StoredVec.DenseFloat] { + def load(m: Mapping.L2Lsh): LshFunction[Mapping.L2Lsh, Vec.DenseFloat, StoredVec.DenseFloat] = new LshFunction.L2(m) } } diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala similarity index 56% rename from es74x/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala index b692e2699..3cf63a5bc 100644 --- a/es74x/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala @@ -4,47 +4,41 @@ import java.lang import java.util.Objects import com.google.common.collect.MinMaxPriorityQueue -import com.klibisz.elastiknn.api.{ElasticsearchCodec, Mapping, Vec} +import com.klibisz.elastiknn.api.{Mapping, Vec} import com.klibisz.elastiknn.models.LshFunction -import com.klibisz.elastiknn.storage.ByteArrayCodec -import com.klibisz.elastiknn.storage.VecCache.ContextCache +import com.klibisz.elastiknn.storage.{StoredVec, UnsafeSerialization} import org.apache.lucene.document.{Field, FieldType} import org.apache.lucene.index._ +import org.apache.lucene.queryparser.xml.builders.MatchAllDocsQueryBuilder import org.apache.lucene.search._ import org.apache.lucene.util.BytesRef import org.elasticsearch.common.lucene.search.function.{CombineFunction, FunctionScoreQuery, LeafScoreFunction, ScoreFunction} +import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder +import org.elasticsearch.index.query.{BoolQueryBuilder, ConstantScoreQueryBuilder, MoreLikeThisQueryBuilder, QueryBuilder, TermQueryBuilder} object LshQuery { - private class LshScoreFunction[M <: Mapping: ElasticsearchCodec, V <: Vec: ByteArrayCodec: ElasticsearchCodec]( + private class LshScoreFunction[M <: Mapping, V <: Vec, S <: StoredVec]( val field: String, val query: V, val candidates: Int, - val lshFunc: LshFunction[M, V], - val contextCache: ContextCache[V]) + val lshFunc: LshFunction[M, V, S])(implicit codec: StoredVec.Codec[V, S]) extends ScoreFunction(CombineFunction.REPLACE) { private val candsHeap: MinMaxPriorityQueue[lang.Float] = MinMaxPriorityQueue.create() override def getLeafScoreFunction(ctx: LeafReaderContext): LeafScoreFunction = { val vecDocVals = ctx.reader.getBinaryDocValues(ExactQuery.vectorDocValuesField(field)) - val docIdCache = contextCache.get(ctx) - - def exactScore(docId: Int): Float = { - val storedVec = docIdCache.get( - docId, - () => - if (vecDocVals.advanceExact(docId)) { - val binaryValue = vecDocVals.binaryValue - val vecBytes = binaryValue.bytes.take(binaryValue.length) - implicitly[ByteArrayCodec[V]].apply(vecBytes).get - } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") - ) - lshFunc.exact(query, storedVec).toFloat - } + + def exactScore(docId: Int): Double = + if (vecDocVals.advanceExact(docId)) { + val binVal = vecDocVals.binaryValue + val storedVec = codec.decode(binVal.bytes, binVal.offset, binVal.length) + lshFunc.exact(query, storedVec) + } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") new LeafScoreFunction { - override def score(docId: Int, intersection: Float): Double = { + override def score(docId: Int, intersection: Float): Double = if (candidates == 0) intersection else if (candsHeap.size() < candidates) { candsHeap.add(intersection) @@ -54,7 +48,6 @@ object LshQuery { candsHeap.add(intersection) exactScore(docId) } else 0f - } override def explainScore(docId: Int, subQueryScore: Explanation): Explanation = Explanation.`match`(100, "Computing LSH similarity") @@ -64,7 +57,7 @@ object LshQuery { override def needsScores(): Boolean = true // This is actually important in the FunctionScoreQuery internals. override def doEquals(other: ScoreFunction): Boolean = other match { - case q: LshScoreFunction[M, V] => + case q: LshScoreFunction[M, V, S] => q.field == field && q.lshFunc == lshFunc && q.query == query && q.lshFunc == lshFunc && q.candidates == candidates case _ => false } @@ -72,25 +65,26 @@ object LshQuery { override def doHashCode(): Int = Objects.hash(field, query, lshFunc, lshFunc, candidates.asInstanceOf[AnyRef]) } - def apply[M <: Mapping: ElasticsearchCodec, V <: Vec: ByteArrayCodec: ElasticsearchCodec]( + def apply[M <: Mapping, V <: Vec, S <: StoredVec]( field: String, mapping: M, queryVec: V, candidates: Int, - cache: ContextCache[V])(implicit lshFunctionCache: LshFunctionCache[M, V]): FunctionScoreQuery = { - val lshFunc: LshFunction[M, V] = lshFunctionCache(mapping) + lshFunctionCache: LshFunctionCache[M, V, S])(implicit codec: StoredVec.Codec[V, S]): Query = { + val lshFunc: LshFunction[M, V, S] = lshFunctionCache(mapping) val isecQuery: BooleanQuery = { val builder = new BooleanQuery.Builder lshFunc(queryVec).foreach { h => - val term = new Term(field, new BytesRef(ByteArrayCodec.encode(h))) + val term = new Term(field, new BytesRef(UnsafeSerialization.writeInt(h))) val termQuery = new TermQuery(term) - val clause = new BooleanClause(termQuery, BooleanClause.Occur.SHOULD) - builder.add(clause) + val constQuery = new ConstantScoreQuery(termQuery) // TODO: is this necessary? + builder.add(new BooleanClause(constQuery, BooleanClause.Occur.SHOULD)) } + builder.setMinimumNumberShouldMatch(1) builder.build() } - val f = new LshScoreFunction(field, queryVec, candidates, lshFunc, cache) - new FunctionScoreQuery(isecQuery, f, CombineFunction.REPLACE, 0f, Float.MaxValue) + val func = new LshScoreFunction(field, queryVec, candidates, lshFunc) + new FunctionScoreQuery(isecQuery, func, CombineFunction.REPLACE, 0f, Float.MaxValue) } private val hashesFieldType: FieldType = { @@ -101,10 +95,10 @@ object LshQuery { ft } - def index[M <: Mapping, V <: Vec: ByteArrayCodec](field: String, vec: V, mapping: M)( - implicit lshFunctionCache: LshFunctionCache[M, V]): Seq[IndexableField] = { + def index[M <: Mapping, V <: Vec: StoredVec.Encoder, S <: StoredVec](field: String, vec: V, mapping: M)( + implicit lshFunctionCache: LshFunctionCache[M, V, S]): Seq[IndexableField] = { ExactQuery.index(field, vec) ++ lshFunctionCache(mapping)(vec).map { h => - new Field(field, ByteArrayCodec.encode(h), hashesFieldType) + new Field(field, UnsafeSerialization.writeInt(h), hashesFieldType) } } diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/query/RewriteQueryBuilder.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/RewriteQueryBuilder.scala similarity index 100% rename from es74x/src/main/scala/com/klibisz/elastiknn/query/RewriteQueryBuilder.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/query/RewriteQueryBuilder.scala diff --git a/es74x/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala similarity index 91% rename from es74x/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala rename to plugin/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala index 8bf25b0f7..8f53cc155 100644 --- a/es74x/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/SparseIndexedQuery.scala @@ -4,7 +4,7 @@ import java.util.Objects import com.klibisz.elastiknn.api._ import com.klibisz.elastiknn.models.SparseIndexedSimilarityFunction -import com.klibisz.elastiknn.storage.ByteArrayCodec +import com.klibisz.elastiknn.storage.UnsafeSerialization import org.apache.lucene.document.{Field, FieldType, NumericDocValuesField} import org.apache.lucene.index._ import org.apache.lucene.search._ @@ -24,7 +24,7 @@ object SparseIndexedQuery { if (numTrueDocValues.advanceExact(docId)) { val numTrue = numTrueDocValues.longValue().toInt // Subtract one from intersection to account for value exists query in boolean query. - simFunc(queryVec, intersection.toInt - 1, numTrue).toFloat + simFunc(queryVec, intersection.toInt - 1, numTrue) } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") } @@ -49,7 +49,7 @@ object SparseIndexedQuery { val builder = new BooleanQuery.Builder builder.add(new BooleanClause(new DocValuesFieldExistsQuery(numTrueDocValueField(field)), BooleanClause.Occur.MUST)) queryVec.trueIndices.foreach { ti => - val term = new Term(field, new BytesRef(ByteArrayCodec.encode(ti))) + val term = new Term(field, new BytesRef(UnsafeSerialization.writeInt(ti))) val termQuery = new TermQuery(term) val clause = new BooleanClause(termQuery, BooleanClause.Occur.SHOULD) builder.add(clause) @@ -72,7 +72,7 @@ object SparseIndexedQuery { def index(field: String, vec: Vec.SparseBool): Seq[IndexableField] = { vec.trueIndices.map { ti => - new Field(field, ByteArrayCodec.encode(ti), trueIndicesFieldType) + new Field(field, UnsafeSerialization.writeInt(ti), trueIndicesFieldType) } ++ ExactQuery.index(field, vec) :+ new NumericDocValuesField(numTrueDocValueField(field), vec.trueIndices.length) } diff --git a/es74x/src/test/java/com/klibisz/elastiknn/ElastiKnnJavaClusterIT.java b/plugin/src/test/java/com/klibisz/elastiknn/ElastiKnnJavaClusterIT.java similarity index 100% rename from es74x/src/test/java/com/klibisz/elastiknn/ElastiKnnJavaClusterIT.java rename to plugin/src/test/java/com/klibisz/elastiknn/ElastiKnnJavaClusterIT.java diff --git a/es74x/src/test/java/com/klibisz/elastiknn/ElastiKnnRestIT.java b/plugin/src/test/java/com/klibisz/elastiknn/ElastiKnnRestIT.java similarity index 100% rename from es74x/src/test/java/com/klibisz/elastiknn/ElastiKnnRestIT.java rename to plugin/src/test/java/com/klibisz/elastiknn/ElastiKnnRestIT.java diff --git a/es74x/src/test/resources/rest-api-spec/test/elastiknn/01_plugin_installed.yml b/plugin/src/test/resources/rest-api-spec/test/elastiknn/01_plugin_installed.yml similarity index 100% rename from es74x/src/test/resources/rest-api-spec/test/elastiknn/01_plugin_installed.yml rename to plugin/src/test/resources/rest-api-spec/test/elastiknn/01_plugin_installed.yml diff --git a/es74x/src/test/resources/rest-api-spec/test/elastiknn/20_basic_2.yml b/plugin/src/test/resources/rest-api-spec/test/elastiknn/20_basic_2.yml similarity index 100% rename from es74x/src/test/resources/rest-api-spec/test/elastiknn/20_basic_2.yml rename to plugin/src/test/resources/rest-api-spec/test/elastiknn/20_basic_2.yml diff --git a/reference/build.gradle b/reference/build.gradle index e446b7f34..5b2e68cb1 100644 --- a/reference/build.gradle +++ b/reference/build.gradle @@ -1,11 +1,17 @@ dependencies { implementation project(':core') + implementation project(':plugin') + implementation project(':client-elastic4s') runtime "org.scala-lang:scala-library:${scalaVersion}" implementation "org.scala-lang:scala-library:${scalaVersion}" implementation 'org.apache.commons:commons-math3:3.6.1' implementation 'org.apache.spark:spark-mllib_2.12:2.4.4' - implementation 'org.elasticsearch:elasticsearch:7.4.0' + implementation "org.elasticsearch:elasticsearch:7.6.2" implementation 'org.apache.lucene:lucene-codecs:8.2.0' implementation "io.circe:circe-generic_${scalaVersion}:${circeVersion}" implementation "io.circe:circe-generic-extras_${scalaVersion}:${circeVersion}" -} \ No newline at end of file + implementation 'com.lihaoyi:upickle_2.12:1.1.0' + implementation 'de.ruedigermoeller:fst:2.48' + implementation 'com.esotericsoftware:kryo:4.0.2' + runtime 'com.esotericsoftware:kryo:4.0.2' +} diff --git a/reference/src/main/scala/com/klibisz/elastiknn/reference/serialization/SerializationBenchmark.scala b/reference/src/main/scala/com/klibisz/elastiknn/reference/serialization/SerializationBenchmark.scala new file mode 100644 index 000000000..1242e0b4f --- /dev/null +++ b/reference/src/main/scala/com/klibisz/elastiknn/reference/serialization/SerializationBenchmark.scala @@ -0,0 +1,132 @@ +package com.klibisz.elastiknn.reference.serialization + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{UnsafeInput, UnsafeOutput} +import com.klibisz.elastiknn.api.Vec + +import scala.util.Random + +object SerializationBenchmark { + + def time[T](msg: String, op: => T): T = { + val t0 = System.currentTimeMillis() + val res = op + println(s"$msg: ${System.currentTimeMillis() - t0} ms") + res + } + + def main(args: Array[String]): Unit = { + + val n = 10000 + val m = 10 + + implicit val r = new Random(99) + val vecs = Vec.SparseBool.randoms(4096, n) + +// val fstConf = FSTConfiguration.createUnsafeBinaryConfiguration() +// fstConf.registerClass(classOf[Vec.SparseBool]) + + val kryo = new Kryo() + kryo.register(classOf[Array[Int]]) + + for { + _ <- 0 until m + } { + +// val vecsProto: Seq[Array[Byte]] = time(s"Write proto", vecs.map(v => ByteArrayCodec.sparseBoolVector(v))) +// println(vecsProto.map(_.length).sum) +// time[Unit](s"Read proto", vecsProto.foreach(b => ByteArrayCodec.sparseBoolVector(b).get)) + +// val vecsFst = time(s"Write FST", vecs.map(fstConf.asByteArray)) +// println(vecsFst.map(_.length).sum) +// val checkFst = time(s"Read FST", vecsFst.map { b => +// fstConf.asObject(b).asInstanceOf[Vec.SparseBool] +// }) +// require(vecs == checkFst) + +// val colfBuffer = new Array[Byte](ColferSparseBool.colferSizeMax) + +// val vecsOOS = time( +// "Write ObjectOutputStream", +// vecs.map { v => +// val bout = new ByteArrayOutputStream() +// val oout = new ObjectOutputStream(bout) +// oout.writeObject(v.totalIndices +: v.trueIndices) +// bout.toByteArray +// } +// ) +// println(vecsOOS.map(_.length).sum) +// +// val checkOOS = time( +// "Read ObjectOutputStream", +// vecsOOS.map { b => +// val bin = new ByteArrayInputStream(b) +// val oin = new ObjectInputStream(bin) +// val arr = oin.readObject.asInstanceOf[Array[Int]] +// Vec.SparseBool(arr.tail, arr.head) +// } +// ) +// assert(vecs == checkOOS) + +// val vecsMsgpack: Seq[Array[Byte]] = time(s"Serialize ${vecs.length} to msgpack", vecs.map { v => +// writeBinary[Array[Int]](v.trueIndices) +// }) +// println(vecsMsgpack.map(_.length).sum) +// time[Unit](s"Deserialize ${vecs.length} from msgpack", vecsMsgpack.foreach(b => readBinary[Array[Int]](b))) + +// val vecsDataOutputStream = time( +// s"Write DataOutputStream", +// vecs.map { v => +// val bout = new ByteArrayOutputStream() +// val dout = new DataOutputStream(bout) +// dout.writeInt(v.trueIndices.length) +// v.trueIndices.foreach(dout.writeShort) +// bout.toByteArray +// } +// ) +// println(vecsDataOutputStream.map(_.length).sum) +// +// val checkDataOutputStream = time( +// s"Read DataOutputStream", +// vecsDataOutputStream.map { b => +// val bin = new ByteArrayInputStream(b) +// val din = new DataInputStream(bin) +// val len = din.readInt() +// val arr = new Array[Int](len) +// arr.indices.foreach(i => arr.update(i, din.readShort())) +// Vec.SparseBool(arr, 4096) +// } +// ) +// require(vecs == checkDataOutputStream) + + val vecsKryo = time( + "Write kryo", + vecs.map { v => + val kout = new UnsafeOutput((v.trueIndices.length + 1) * 4) + kout.writeInt(v.trueIndices.length) + kout.writeInts(v.trueIndices) + kout.close() + kout.toBytes + } + ) + println(vecsKryo.map(_.length).sum) + + val checkKryo = time( + "Read kryo", + vecsKryo.map { b => + val kin = new UnsafeInput(b) + val len = kin.readInt() + val arr = kin.readInts(len) + kin.close() + Vec.SparseBool(arr, 4096) + } + ) + require(vecs == checkKryo) + + (0 to 5).foreach(_ => println("")) + + } + + } + +} diff --git a/settings.gradle b/settings.gradle index b15f230ee..692c1406c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,7 +1,8 @@ rootProject.name = 'elastiknn' include 'core' -include 'es74x' +include 'plugin' include 'testing' include 'reference' include 'client-elastic4s' +include 'benchmarks' diff --git a/testing/build.gradle b/testing/build.gradle index 97694a75f..d8b55f167 100644 --- a/testing/build.gradle +++ b/testing/build.gradle @@ -36,7 +36,6 @@ dependencies { implementation "org.scala-lang:scala-library:$scalaVersion" implementation 'com.typesafe:config:1.4.0' implementation "com.sksamuel.elastic4s:elastic4s-client-esjava_${scalaVersion}:7.6.0" - implementation "io.github.scalapb-json:scalapb-circe_${scalaVersion}:${scalapbCirceVersion}" implementation "io.circe:circe-generic_${scalaVersion}:${circeVersion}" implementation "org.scalatest:scalatest_${scalaVersion}:3.0.8" testImplementation 'org.apache.commons:commons-math3:3.6.1' diff --git a/testing/docker-compose.yml b/testing/docker-compose.yml index 2c5155d9f..54f71fb87 100644 --- a/testing/docker-compose.yml +++ b/testing/docker-compose.yml @@ -4,7 +4,7 @@ services: # Single master node. elasticsearch_master: build: - context: ${ES_CONTEXT:-../es74x} + context: ${ES_CONTEXT:-../plugin} dockerfile: Dockerfile container_name: elasticsearch_master environment: @@ -28,7 +28,7 @@ services: # Single client node exposing port 9200. elasticsearch_client: build: - context: ${ES_CONTEXT:-../es74x} + context: ${ES_CONTEXT:-../plugin} dockerfile: Dockerfile ports: - "9200:9200" @@ -56,7 +56,7 @@ services: # 1 or more data nodes (--scale elasticsearch_data=n) elasticsearch_data: build: - context: ${ES_CONTEXT:-../es74x} + context: ${ES_CONTEXT:-../plugin} dockerfile: Dockerfile environment: - cluster.name=docker-cluster diff --git a/testing/src/test/scala/com/klibisz/elastiknn/ClusterSpec.scala b/testing/src/test/scala/com/klibisz/elastiknn/ClusterSpec.scala index 4e0681846..7d2a52af4 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/ClusterSpec.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/ClusterSpec.scala @@ -31,7 +31,8 @@ class ClusterSpec extends AsyncFunSuite with Matchers with Elastic4sMatchers wit } yield { catNodesRes.shouldBeSuccess catNodesRes.result should have length 4 - catNodesRes.result.map(_.nodeRole).sorted shouldBe Seq("-", "dil", "dil", "m").sorted + catNodesRes.result.map(_.nodeRole).sorted shouldBe List("-", "dil", "dil", "m").sorted + // These change to Seq("dilrt", "dilrt", "mr", "r") in version 7.7.x. } } diff --git a/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala index dc056abe3..c9a4b6bc6 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/api/ElasticsearchCodecSuite.scala @@ -29,7 +29,8 @@ class ElasticsearchCodecSuite extends FunSuite with Matchers { def shouldNotDecodeTo[T: ElasticsearchCodec]: Assertion = { val parsed = ElasticsearchCodec.parse(s) - assertThrows[DecodingFailure](parsed.flatMap(ElasticsearchCodec.decodeJson[T]).toTry.get) + val tryDecode = parsed.flatMap(ElasticsearchCodec.decodeJson[T]).toTry + assertThrows[DecodingFailure](tryDecode.get) } } diff --git a/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala index 93b9f71f3..4fa049b0b 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala @@ -133,11 +133,11 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with } test("throw an error given vector with bad dimensions") { - val indexName = s"test-${UUID.randomUUID()}" + val indexName = s"test-intentional-failure-${UUID.randomUUID()}" val dims = 100 val inputs = Seq( - ("vec_sbv", Mapping.SparseBool(dims), Vec.SparseBool.random(dims + 1)), - ("vec_dfv", Mapping.DenseFloat(dims), Vec.DenseFloat.random(dims + 1)) + ("intentional-failure-sbv", Mapping.SparseBool(dims), Vec.SparseBool.random(dims + 1)), + ("intentional-failure-dfv", Mapping.DenseFloat(dims), Vec.DenseFloat.random(dims + 1)) ) for { _ <- eknn.execute(createIndex(indexName)) diff --git a/testing/src/test/scala/com/klibisz/elastiknn/storage/UnsafeSerializationSuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/storage/UnsafeSerializationSuite.scala new file mode 100644 index 000000000..e39271c8f --- /dev/null +++ b/testing/src/test/scala/com/klibisz/elastiknn/storage/UnsafeSerializationSuite.scala @@ -0,0 +1,87 @@ +package com.klibisz.elastiknn.storage + +import org.scalatest.{FunSuite, Matchers} + +import scala.util.Random + +class UnsafeSerializationSuite extends FunSuite with Matchers { + + test("arrays of ints") { + val seed = System.currentTimeMillis() + val maxLen = 4096 + val rng = new Random(seed) + for (i <- 0 to 1000) { + withClue(s"Failed on iteration $i with seed $seed and max length $maxLen") { + // Generate array of random ints. + val len = rng.nextInt(maxLen) + val iarr = (0 until len).map(_ => rng.nextInt(Int.MaxValue) * (if (rng.nextBoolean()) 1 else -1)).toArray + + // Serialize and check serialized length. + val trimmed = UnsafeSerialization.writeInts(iarr) + trimmed should have length (iarr.length * UnsafeSerialization.numBytesInInt) + + // Deserialize and check. + val iarrReadTrimmed = UnsafeSerialization.readInts(trimmed, 0, trimmed.length) + iarrReadTrimmed shouldBe iarr + + // Place in larger array with random offset. + val offset = rng.nextInt(maxLen) + val embedded = new Array[Byte](offset) ++ trimmed ++ new Array[Byte](rng.nextInt(maxLen)) + + // Deserialize and check. + val iarrReadEmbedded = UnsafeSerialization.readInts(embedded, offset, trimmed.length) + iarrReadEmbedded shouldBe iarr + } + } + } + + test("arrays of floats") { + val seed = System.currentTimeMillis() + val maxLen = 4096 + val rng = new Random(seed) + for (i <- 0 to 1000) { + withClue(s"Failed on iteration $i with seed $seed and max length $maxLen") { + // Generate array of random floats. + val len = rng.nextInt(maxLen) + val farr = (0 until len).map(_ => rng.nextFloat() * (if (rng.nextBoolean()) Float.MaxValue else Float.MinValue)).toArray + + // Serialize and check length. + val trimmed = UnsafeSerialization.writeFloats(farr) + trimmed should have length (farr.length * UnsafeSerialization.numBytesInFloat) + + // Deserialize and check. + val farrTrimmed = UnsafeSerialization.readFloats(trimmed, 0, trimmed.length) + farrTrimmed shouldBe farr + + // Place in larger array with random offset. + val offset = rng.nextInt(maxLen) + val embedded = new Array[Byte](offset) ++ trimmed ++ new Array[Byte](rng.nextInt(maxLen)) + + // Deserialize and check. + val farrReadEmbedded = UnsafeSerialization.readFloats(embedded, offset, trimmed.length) + farrReadEmbedded shouldBe farr + } + } + } + + test("ints variable length encoding") { + UnsafeSerialization.writeInt(127) should have length 1 + UnsafeSerialization.writeInt(-127) should have length 1 + UnsafeSerialization.writeInt(32767) should have length 2 + UnsafeSerialization.writeInt(-32767) should have length 2 + } + + test("ints randomized") { + val seed = System.currentTimeMillis() + val rng = new Random(seed) + for (i <- 0 to 10000) { + withClue(s"Failed on iteration $i with seed $seed") { + val i = rng.nextInt(Int.MaxValue) * (if (rng.nextBoolean()) 1 else -1) + val barr = UnsafeSerialization.writeInt(i) + val iRead = UnsafeSerialization.readInt(barr) + iRead shouldBe i + } + } + } + +}