Skip to content

Commit e7d3136

Browse files
Toward release 0.5.0 - fast TDigest and Spark-3 Aggregator API (#20)
* update isarn-sketches dep to 0.2.2 * test design with thin shim class for new fast TDigest to clean up the API * update initial commands * scope t digest shim class, TDigestAggregator companion obj * bump isarn-sketches to 0.3.0 * example of a java/python binding * modify python tdigest UDT and a test UDF * ScalarNumeric, data type functions, python renaming, commenting out old code * spark 3.0 supports scala 2.12 only * http -> https * TDigestArrayAggregator * array function overloadings * add instructions for cleaning out ivy on local publish * spark vector aggregations * no longer need UDT for tdigest array * old tdigest UDTs are obsolete * remove package object * sketches.spark.tdigest._ * tdigest.scala * TDigestReduceAggregator * TDigestArrayReduceAggregator * TDigestUDAF.scala is obsolete * TDigestArrayReduceAggregator inherit from TDigestArrayAggregatorBase * factor out compression and maxdiscrete from TDigestArrayAggregatorBase * /udaf/ -> /spark/ * /udaf/ -> /spark/ * move python TDigestUDT into spark/tdigest.py * update sbt build mappings for python refactor * update readme python for new organization * copyright * unused imports * more unused imports * switch to fast java TDigest * explicit import of JavaPredictionModel * /pipelines/ -> /pipelines/spark/ * python/isarnproject/pipelines/__init__.py * update build mappings for new python organization * update package paths for new organization * fix package object path * update copyright * update pyspark tdigest to be cleaner and analogous to java implementation * spark pipeline param delta -> compression * fi.scala * update assembly dep and move it into plugins * add scaladoc * move ScalarNumeric out of tdigest specific package * update README examples for scala * spark.sparkContext * update python tdigest examples * update feature importance examples * isarn-sketches-java * utest harness for spark testing * TDigestAggregator test * TDigestAggregator test * KSD cumulative distribution divergence measure for unit testing * test counts * BigDecimal range * local build against spark-3.0.1-SNAPSHOT * test TDigestArrayAggregator * tests for spark ML vector types * cache test data sets * test tdigest reducing aggregators * epsD * move approx to base class * disable parallel test execution to prevent spark cluster teardown race conditions * feature importance unit test * build against spark 3.0.1 * xsbt -> sbt * 0.5.0
1 parent a8ec2b4 commit e7d3136

File tree

20 files changed

+1642
-1573
lines changed

20 files changed

+1642
-1573
lines changed

README.md

Lines changed: 211 additions & 303 deletions
Large diffs are not rendered by default.

build.sbt

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
1-
// xsbt clean unidoc previewSite
2-
// xsbt clean unidoc ghpagesPushSite
3-
// xsbt +publish
1+
// sbt clean unidoc previewSite
2+
// sbt clean unidoc ghpagesPushSite
3+
// sbt +publish
44
// https://oss.sonatype.org
55
// make sure sparkVersion is set as you want prior to +publish
6+
// when doing localPublish, also do:
7+
// rm -rf /home/eje/.ivy2/local/org.isarnproject /home/eje/.ivy2/cache/org.isarnproject
68

79
import scala.sys.process._
810

911
name := "isarn-sketches-spark"
1012

1113
organization := "org.isarnproject"
1214

13-
val packageVersion = "0.4.1-SNAPSHOT"
15+
val packageVersion = "0.5.0"
1416

15-
val sparkVersion = "3.0.0"
17+
val sparkVersion = "3.0.1"
1618

1719
val sparkSuffix = s"""sp${sparkVersion.split('.').take(2).mkString(".")}"""
1820

1921
version := s"${packageVersion}-${sparkSuffix}"
2022

2123
scalaVersion := "2.12.11"
2224

23-
crossScalaVersions := Seq("2.12.11") // scala 2.12.11 when spark supports it
25+
crossScalaVersions := Seq("2.12.11")
2426

2527
pomIncludeRepository := { _ => false }
2628

@@ -54,14 +56,22 @@ developers := List(
5456
)
5557
)
5658

59+
resolvers += Resolver.mavenLocal
60+
5761
libraryDependencies ++= Seq(
58-
"org.isarnproject" %% "isarn-sketches" % "0.1.2",
62+
"org.isarnproject" % "isarn-sketches-java" % "0.3.0",
5963
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
6064
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
6165
"org.apache.spark" %% "spark-mllib" % sparkVersion % Provided,
62-
"org.isarnproject" %% "isarn-scalatest" % "0.0.3" % Test,
63-
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
64-
"org.apache.commons" % "commons-math3" % "3.6.1" % Test)
66+
"com.lihaoyi" %% "utest" % "0.7.4" % Test)
67+
68+
// tell sbt about utest
69+
testFrameworks += new TestFramework("utest.runner.Framework")
70+
71+
// default is to run tests in parallel, asynchronously, but
72+
// that breaks both spark-cluster setup and teardown, and also breaks
73+
// repeatability of the random data generation
74+
parallelExecution in Test := false
6575

6676
initialCommands in console := """
6777
|import org.apache.spark.SparkConf
@@ -70,10 +80,10 @@ initialCommands in console := """
7080
|import org.apache.spark.SparkContext._
7181
|import org.apache.spark.rdd.RDD
7282
|import org.apache.spark.ml.linalg.Vectors
73-
|import org.isarnproject.sketches.TDigest
74-
|import org.isarnproject.sketches.udaf._
75-
|import org.apache.spark.isarnproject.sketches.udt._
76-
|val initialConf = new SparkConf().setAppName("repl").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "16mb")
83+
|import org.apache.spark.sql.functions._
84+
|import org.isarnproject.sketches.java.TDigest
85+
|import org.isarnproject.sketches.spark._
86+
|val initialConf = new SparkConf().setAppName("repl")
7787
|val spark = SparkSession.builder.config(initialConf).master("local[2]").getOrCreate()
7888
|import spark._, spark.implicits._
7989
|val sc = spark.sparkContext
@@ -90,12 +100,11 @@ scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
90100
mappings in (Compile, packageBin) ++= Seq(
91101
(baseDirectory.value / "python" / "isarnproject" / "__init__.py") -> "isarnproject/__init__.py",
92102
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "__init__.py") -> "isarnproject/pipelines/__init__.py",
93-
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "fi.py") -> "isarnproject/pipelines/fi.py",
103+
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "spark" / "__init__.py") -> "isarnproject/pipelines/spark/__init__.py",
104+
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "spark" / "fi.py") -> "isarnproject/pipelines/spark/fi.py",
94105
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "__init__.py") -> "isarnproject/sketches/__init__.py",
95-
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "__init__.py") -> "isarnproject/sketches/udaf/__init__.py",
96-
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "tdigest.py") -> "isarnproject/sketches/udaf/tdigest.py",
97-
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "__init__.py") -> "isarnproject/sketches/udt/__init__.py",
98-
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "tdigest.py") -> "isarnproject/sketches/udt/tdigest.py"
106+
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "spark" / "__init__.py") -> "isarnproject/sketches/spark/__init__.py",
107+
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "spark" / "tdigest.py") -> "isarnproject/sketches/spark/tdigest.py",
99108
)
100109

101110
test in assembly := {}

project/assembly.sbt

Lines changed: 0 additions & 1 deletion
This file was deleted.

project/plugins.sbt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
resolvers += Resolver.url(
22
"bintray-sbt-plugin-releases",
3-
url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
3+
url("https://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
44
Resolver.ivyStylePatterns)
55

66
resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
77

8-
resolvers += "jgit-repo" at "http://download.eclipse.org/jgit/maven"
8+
resolvers += "jgit-repo" at "https://download.eclipse.org/jgit/maven"
99

1010
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.6.3")
1111

@@ -15,6 +15,8 @@ addSbtPlugin("io.crashbox" % "sbt-gpg" % "0.2.1")
1515

1616
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.2")
1717

18+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
19+
1820
// scoverage and coveralls deps are at old versions to avoid a bug in the current versions
1921
// update these when this fix is released: https://github.com/scoverage/sbt-coveralls/issues/73
2022
//addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4")

python/isarnproject/pipelines/fi.py renamed to python/isarnproject/pipelines/spark/fi.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pyspark import since, keyword_only
22
from pyspark.ml.param.shared import *
33
from pyspark.ml.util import *
4-
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
4+
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper, JavaPredictionModel
55
from pyspark.ml.common import inherit_doc
66
from pyspark.sql import DataFrame
77

@@ -14,19 +14,19 @@ def toPredictionModel(value):
1414
raise TypeError("object %s was not a JavaPredictionModel" % (value))
1515

1616
class TDigestParams(Params):
17-
delta = Param(Params._dummy(), "delta", "tdigest compression parameter",
17+
compression = Param(Params._dummy(), "compression", "tdigest compression parameter",
1818
typeConverter=TypeConverters.toFloat)
1919
maxDiscrete = Param(Params._dummy(), "maxDiscrete", "maximum discrete values",
2020
typeConverter=TypeConverters.toInt)
2121

2222
def __init__(self):
2323
super(TDigestParams, self).__init__()
2424

25-
def setDelta(self, value):
26-
return self._set(delta=value)
25+
def setCompression(self, value):
26+
return self._set(compression=value)
2727

28-
def getDelta(self):
29-
return self.getOrDefault(self.delta)
28+
def getCompression(self):
29+
return self.getOrDefault(self.compression)
3030

3131
def setMaxDiscrete(self, value):
3232
return self._set(maxDiscrete=value)
@@ -90,15 +90,15 @@ class TDigestFI(JavaEstimator, TDigestFIParams, JavaMLWritable, JavaMLReadable):
9090
"""
9191

9292
@keyword_only
93-
def __init__(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"):
93+
def __init__(self, compression = 0.5, maxDiscrete = 0, featuresCol = "features"):
9494
super(TDigestFI, self).__init__()
95-
self._java_obj = self._new_java_obj("org.isarnproject.pipelines.TDigestFI", self.uid)
96-
self._setDefault(delta = 0.5, maxDiscrete = 0, featuresCol = "features")
95+
self._java_obj = self._new_java_obj("org.isarnproject.pipelines.spark.fi.TDigestFI", self.uid)
96+
self._setDefault(compression = 0.5, maxDiscrete = 0, featuresCol = "features")
9797
kwargs = self._input_kwargs
9898
self.setParams(**kwargs)
9999

100100
@keyword_only
101-
def setParams(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"):
101+
def setParams(self, compression = 0.5, maxDiscrete = 0, featuresCol = "features"):
102102
kwargs = self._input_kwargs
103103
return self._set(**kwargs)
104104

0 commit comments

Comments
 (0)