Skip to content

Commit

Permalink
Perf improvements based on benchmarking (without all of the benchmark…
Browse files Browse the repository at this point in the history
…ing code) (#75)

Some internal changes based on a lot of benchmarking work.
Deleted the benchmarking code from this branch and will merge it as a separate PR.
See changelog entry.
  • Loading branch information
alexklibisz authored Jun 17, 2020
1 parent 9d582a1 commit ac08eab
Show file tree
Hide file tree
Showing 56 changed files with 727 additions and 451 deletions.
3 changes: 1 addition & 2 deletions .esopts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
-Dtests.heap.size=4G
-Dtests.es.thread_pool.write.size=4
-Dtests.es.thread_pool.search.size=4
-Dtests.es.indices.query.bool.max_clause_count=4096
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 12.0.2
java-version: 14

- name: Setup Ruby
uses: actions/setup-ruby@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 12.0.2
java-version: 14

- name: Setup Ruby
uses: actions/setup-ruby@v1
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ build
target
release.md
*.csv
.minio/*
!.minio/.keep

# Ignore Gradle GUI config
gradle-app.setting
Expand Down
Empty file added .minio/.keep
Empty file.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version = "2.0.0-RC6"
version = "2.5.2"
maxColumn = 140
11 changes: 5 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ core = $(pwd)/core
vpip = ./venv/bin/pip
vpy = ./venv/bin/python
gradle = ./gradlew
eslatest = es74x
eslatest = plugin
s3 = aws s3
build_bucket = s3://com-klibisz-elastiknn-builds/
dc = docker-compose
Expand Down Expand Up @@ -76,17 +76,17 @@ run/cluster: .mk/run-cluster

run/gradle:
cd testing && $(dc) down
$(gradle) run $(shell cat .esopts | xargs)
$(gradle) :plugin:run $(shell cat .esopts | xargs)

run/debug:
cd testing && $(dc) down
$(gradle) run $(shell cat .esopts | xargs) --debug-jvm
$(gradle) :plugin:run $(shell cat .esopts | xargs) --debug-jvm

run/kibana:
docker run --network host -e ELASTICSEARCH_HOSTS=http://localhost:9200 -p 5601:5601 -d --rm kibana:7.4.0
docker run --network host -e ELASTICSEARCH_HOSTS=http://localhost:9200 -p 5601:5601 -d --rm kibana:7.6.2
docker ps | grep kibana

run/demo/app: .mk/gradle-publish-local .mk/example-demo-sbt-docker-stage .mk/example-demo-sbt-docker-stage .mk/vm-max-map-count
run/demo: .mk/gradle-publish-local .mk/example-demo-sbt-docker-stage .mk/example-demo-sbt-docker-stage .mk/vm-max-map-count
cd examples/demo && \
PLAY_HTTP_SECRET_KEY=$(shell sha256sum ~/.ssh/id_rsa | cut -d' ' -f1) $(dc) up --build --detach

Expand Down Expand Up @@ -161,4 +161,3 @@ publish/docs: .mk/gradle-docs .mk/client-python-docs
publish/site: .mk/jekyll-site-build
mkdir -p docs/_site/docs
rsync -av --delete --exclude docs docs/_site/ $(site_srvr):$(site_main)

7 changes: 7 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
- Removed the internal vector caching and instead using `sun.misc.Unsafe` to speed up vector serialization and deserialization.
The result is actually faster queries _without_ caching than it previously had _with_ caching.
Also able to remove the protobuf dependency which was previously used to serialize vectors.
- Upgraded Elasticsearch version from 7.4.0 to 7.6.2.
Attempted to use 7.7.1 but the Apache Lucene version used in 7.7.x introduces a performance regression ([Details](https://issues.apache.org/jira/browse/LUCENE-9378)).
Switching from Java 13 to 14 also yields a slight speedup for intersections on sparse vectors.
---
- Internal change from custom Lucene queries to FunctionScoreQueries. This reduces quite a bit of boilerplate code and
surface area for bugs and performance regressions.
- Add optional progress bar to Python ElastiknnModel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ trait ElastiknnClient[F[_]] extends AutoCloseable {
def putMapping(index: String, field: String, mapping: Mapping): F[Response[PutMappingResponse]] =
execute(ElastiknnRequests.putMapping(index, field, mapping))

def index(index: String,
field: String,
vecs: Seq[Vec],
ids: Option[Seq[String]] = None,
refresh: RefreshPolicy = RefreshPolicy.NONE): F[Response[BulkResponse]] = {
def index(index: String, field: String, vecs: Seq[Vec], ids: Option[Seq[String]] = None, refresh: RefreshPolicy = RefreshPolicy.NONE): F[Response[BulkResponse]] = {
val reqs = vecs.map(v => ElastiknnRequests.indexVec(index, field, v))
val withIds = ids match {
case Some(idSeq) if idSeq.length == reqs.length =>
Expand All @@ -48,8 +44,9 @@ trait ElastiknnClient[F[_]] extends AutoCloseable {

object ElastiknnClient {

def futureClient(host: String = "localhost", port: Int = 9200, strictFailure: Boolean = true)(
implicit ec: ExecutionContext): ElastiknnFutureClient = {
final case class StrictFailureException(message: String, cause: Throwable = None.orNull) extends RuntimeException(message, cause)

def futureClient(host: String = "localhost", port: Int = 9200, strictFailure: Boolean = true)(implicit ec: ExecutionContext): ElastiknnFutureClient = {
val rc: RestClient = RestClient.builder(new HttpHost(host, port)).build()
val jc: JavaClient = new JavaClient(rc)
new ElastiknnFutureClient {
Expand Down Expand Up @@ -78,27 +75,17 @@ object ElastiknnClient {
else
bulkResponseItems.head.error match {
case Some(err) =>
Some(
ElasticError(err.`type`,
err.reason,
Some(err.index_uuid),
Some(err.index),
Some(err.shard.toString),
Seq.empty,
None,
None,
None,
Seq.empty))
Some(ElasticError(err.`type`, err.reason, Some(err.index_uuid), Some(err.index), Some(err.shard.toString), Seq.empty, None, None, None, Seq.empty))
case None => findBulkError(bulkResponseItems.tail, acc)
}
if (res.isError) Left(res.error.asException)
else if (res.status != 200) Left(new RuntimeException(s"Returned non-200 response: [$res]"))
else if (res.status != 200) Left(StrictFailureException(s"Returned non-200 response: [$res]"))
else
res.result match {
case bulkResponse: BulkResponse if bulkResponse.hasFailures =>
findBulkError(bulkResponse.items) match {
case Some(err) => Left(err.asException)
case None => Left(new RuntimeException(s"Unknown bulk execution error in response $res"))
case None => Left(StrictFailureException(s"Unknown bulk execution error in response $res"))
}
case other => Right(other)
}
Expand Down
4 changes: 2 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ plugins {
dependencies {
runtime "org.scala-lang:scala-library:${scalaVersion}"
implementation "org.scala-lang:scala-library:${scalaVersion}"
implementation "io.circe:circe-generic_${scalaVersion}:${circeVersion}"
implementation "io.circe:circe-parser_${scalaVersion}:${circeVersion}"
compile "io.circe:circe-generic_${scalaVersion}:${circeVersion}"
compile "io.circe:circe-parser_${scalaVersion}:${circeVersion}"
compile "io.circe:circe-generic-extras_${scalaVersion}:${circeVersion}"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.klibisz.elastiknn.storage;

import sun.misc.Unsafe;

import java.lang.reflect.Field;
import java.security.AccessController;
import java.security.PrivilegedAction;

/**
* Uses the sun.misc.Unsafe classes to serialize int and float arrays for optimal performance based on benchmarking.
* This is largely a simplification of the UnsafeInput and UnsafeOutput classes from the Kryo library.
*/
public class UnsafeSerialization {

public static final int numBytesInInt = 4;
public static final int numBytesInFloat = 4;

private static final UnsafeUtil u = AccessController.doPrivileged((PrivilegedAction<UnsafeUtil>) () -> {
try {
return new UnsafeUtil();
} catch (Exception ex) {
throw new RuntimeException("Failed to initialize UnsafeSerialization", ex);
}
});

public static byte[] writeInt(final int i) {
final int a = Math.abs(i);
if (a <= Byte.MAX_VALUE) {
final byte[] buf = new byte[1];
u.unsafe.putInt(buf, u.byteArrayOffset, i);
return buf;
} else if (a <= Short.MAX_VALUE) {
final byte[] buf = new byte[2];
u.unsafe.putInt(buf, u.byteArrayOffset, i);
return buf;
} else {
final byte[] buf = new byte[4];
u.unsafe.putInt(buf, u.byteArrayOffset, i);
return buf;
}
}

public static int readInt(final byte[] barr) {
return u.unsafe.getInt(barr, u.byteArrayOffset);
}

/**
* Writes ints to a byte array.
* @param iarr ints to serialize.
* @return Array of bytes with length (4 * iarr.length).
*/
public static byte[] writeInts(final int[] iarr) {
final int bytesLen = iarr.length * numBytesInInt;
byte[] buf = new byte[bytesLen];
u.unsafe.copyMemory(iarr, u.intArrayOffset, buf, u.byteArrayOffset, bytesLen);
return buf;
}

/**
* Reads ints from a byte array.
*/
public static int[] readInts(final byte[] barr, final int offset, final int length) {
final int[] iarr = new int[length / numBytesInInt];
u.unsafe.copyMemory(barr, offset + u.byteArrayOffset, iarr, u.intArrayOffset, length);
return iarr;
}

/**
* Writes floats to a byte array.
*/
public static byte[] writeFloats(final float[] farr) {
final int bytesLen = farr.length * numBytesInFloat;
final byte[] buf = new byte[bytesLen];
u.unsafe.copyMemory(farr, u.floatArrayOffset, buf, u.byteArrayOffset, bytesLen);
return buf;
}

/**
* Reads floats from a byte array.
*/
public static float[] readFloats(final byte[] barr, final int offset, final int length) {
final float[] farr = new float[length / numBytesInFloat];
u.unsafe.copyMemory(barr, offset + u.byteArrayOffset, farr, u.floatArrayOffset, length);
return farr;
}

private static class UnsafeUtil {
public final Unsafe unsafe;
public final long intArrayOffset;
public final long floatArrayOffset;
public final long byteArrayOffset;
public UnsafeUtil() throws NoSuchFieldException, IllegalAccessException {
final Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
intArrayOffset = unsafe.arrayBaseOffset(int[].class);
floatArrayOffset = unsafe.arrayBaseOffset(float[].class);
byteArrayOffset = unsafe.arrayBaseOffset(byte[].class);
}
}

}
32 changes: 12 additions & 20 deletions core/src/main/java/com/klibisz/elastiknn/utils/ArrayUtils.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package com.klibisz.elastiknn.utils;

/**
* Java implementations of some particularly performance-critical code paths.
* Java implementations of some hot spots.
*/
public class ArrayUtils {

private static void unsortedException(int lit, int big) {
throw new IllegalArgumentException(String.format("Called on unsorted array: %d came after %d", lit, big));
}

public static int sortedIntersectionCount(int [] xs, int [] ys) {
/**
* Compute the number of intersecting (i.e. identical) elements between two int arrays.
* IMPORTANT: Assumes the given arrays are already sorted in ascending order and _does not_ check if this is true.
* If the given arrays are not sorted, the answer will be wrong.
* This is implemented in Java because for some reason Scala will Box the ints in some cases which is unnecessary
* and far slower.
* @param xs
* @param ys
* @return The number of identical elements in the two arrays. For example {1,2,3}, {2,3,4} would return 2.
*/
public static int sortedIntersectionCount(final int [] xs, final int [] ys) {
int n = 0;
int xi = 0;
int yi = 0;
int xmax = Integer.MIN_VALUE;
int ymax = Integer.MIN_VALUE;
while (xi < xs.length && yi < ys.length) {
int x = xs[xi];
int y = ys[yi];
if (x < xmax) unsortedException(x, xmax);
else xmax = x;
if (y < ymax) unsortedException(y, ymax);
else ymax = y;
if (x < y) xi += 1;
else if (x > y) yi += 1;
else {
Expand All @@ -30,14 +30,6 @@ public static int sortedIntersectionCount(int [] xs, int [] ys) {
yi += 1;
}
}
while(xi < xs.length) {
if (xs[xi] < xmax) unsortedException(xs[xi], xmax);
xi += 1;
}
while(yi < ys.length) {
if (ys[yi] < ymax) unsortedException(ys[yi], ymax);
yi += 1;
}
return n;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,21 @@ object ElasticsearchCodec { esc =>
implicit val cfg: Configuration = Configuration.default.withSnakeCaseMemberNames
ElasticsearchCodec(deriveConfiguredCodec)
}
implicit val emptyVec: ESC[Vec.Empty] = {
implicit val cfg: Configuration = Configuration.default.withStrictDecoding
ElasticsearchCodec(deriveConfiguredCodec)
}

implicit val vector: ESC[api.Vec] = new ESC[api.Vec] {
override def apply(t: Vec): Json = t match {
case ixv: Vec.Indexed => encode(ixv)
case sbv: Vec.SparseBool => encode(sbv)
case dfv: Vec.DenseFloat => encode(dfv)
case emp: Vec.Empty => encode(emp)
}
// TODO: Compare performance of .orElse to alternatives that just check for specific json keys.
override def apply(c: HCursor): Either[DecodingFailure, Vec] =
denseFloatVector(c).orElse(sparseBoolVector(c)).orElse(indexedVector(c))
sparseBoolVector(c).orElse(denseFloatVector(c)).orElse(indexedVector(c)).orElse(emptyVec(c))
}

implicit val mappingSparseBool: ESC[Mapping.SparseBool] = ElasticsearchCodec(deriveCodec)
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/com/klibisz/elastiknn/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package object api {
case other: SparseBool => trueIndices.deep == other.trueIndices.deep && totalIndices == other.totalIndices
case _ => false
}
override def toString: String = s"SparseBool(${trueIndices.take(3).mkString(",")},...,$totalIndices)"
override def toString: String = s"SparseBool(${trueIndices.take(3).mkString(",")},...,${trueIndices.length}/$totalIndices)"
}

object SparseBool {
Expand Down Expand Up @@ -57,7 +57,6 @@ package object api {
}
dp
}

}

object DenseFloat {
Expand All @@ -69,6 +68,9 @@ package object api {
}

final case class Indexed(index: String, id: String, field: String) extends Vec

private[elastiknn] final case class Empty() extends Vec

}

sealed trait Mapping {
Expand Down
Loading

0 comments on commit ac08eab

Please sign in to comment.