Skip to content

Commit 00a7c3f

Browse files
documentation for 0.1.0 (#2)
* scaladoc * fix bug in TDigestArrayUDAF, forgot to re-store updated buffer * version 0.1.0 * t-digest UDAF examples
1 parent 41f6da1 commit 00a7c3f

File tree

5 files changed

+268
-3
lines changed

5 files changed

+268
-3
lines changed

README.md

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,143 @@
11
# isarn-sketches-spark
22
Routines and data structures for using isarn-sketches idiomatically in Apache Spark
3+
4+
## API documentation
5+
https://isarn.github.io/isarn-sketches-spark/latest/api/#org.isarnproject.sketches.udaf.package
6+
7+
## How to use in your project
8+
9+
#### sbt
10+
``` scala
11+
resolvers += "isarn project" at "https://dl.bintray.com/isarn/maven/"
12+
13+
libraryDependencies += "org.isarnproject" %% "isarn-sketches-spark" % "0.1.0"
14+
```
15+
16+
#### maven
17+
``` xml
18+
<dependency>
19+
<groupId>org.isarnproject</groupId>
20+
<artifactId>isarn-sketches-spark_2.10</artifactId>
21+
<version>0.1.0</version>
22+
<type>pom</type>
23+
</dependency>
24+
```
25+
26+
## Examples
27+
28+
### Sketch a numeric column
29+
```scala
30+
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
31+
import org.isarnproject.sketches._
32+
import org.isarnproject.sketches.udaf._
33+
import org.apache.spark.isarnproject.sketches.udt._
34+
35+
scala> import scala.util.Random.nextGaussian
36+
import scala.util.Random.nextGaussian
37+
38+
scala> val data = sc.parallelize(Vector.fill(1000){(nextGaussian, nextGaussian)}).toDF.as[(Double, Double)]
39+
data: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: double]
40+
41+
scala> val udaf = tdigestUDAF[Double].delta(0.2).maxDiscrete(25)
42+
udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.2,25)
43+
44+
scala> val agg = data.agg(udaf($"_1"), udaf($"_2"))
45+
agg: org.apache.spark.sql.DataFrame = [tdigestudaf(_1): tdigest, tdigestudaf(_2): tdigest]
46+
47+
scala> val (td1, td2) = (agg.first.getAs[TDigestSQL](0).tdigest, agg.first.getAs[TDigestSQL](1).tdigest)
48+
td1: org.isarnproject.sketches.TDigest = TDigest(0.2,25,151,TDigestMap(-3.1241237514093707 -> (1.0, 1.0), ...
49+
50+
scala> td1.cdf(0)
51+
res1: Double = 0.5159531867457404
52+
53+
scala> td2.cdf(0)
54+
res2: Double = 0.504233763693618
55+
```
56+
57+
### Sketch a numeric array column
58+
```scala
59+
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
60+
import org.isarnproject.sketches._
61+
import org.isarnproject.sketches.udaf._
62+
import org.apache.spark.isarnproject.sketches.udt._
63+
64+
scala> import scala.util.Random._
65+
import scala.util.Random._
66+
67+
scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vector.fill(5){nextGaussian})})
68+
data: org.apache.spark.sql.DataFrame = [_1: int, _2: array<double>]
69+
70+
scala> val udaf1 = tdigestUDAF[Int].maxDiscrete(20)
71+
udaf1: org.isarnproject.sketches.udaf.TDigestUDAF[Int] = TDigestUDAF(0.5,20)
72+
73+
scala> val udafA = tdigestArrayUDAF[Double]
74+
udafA: org.isarnproject.sketches.udaf.TDigestArrayUDAF[Double] = TDigestArrayUDAF(0.5,0)
75+
76+
scala> val (first1, firstA) = (data.agg(udaf1($"_1")).first, data.agg(udafA($"_2")).first)
77+
first1: org.apache.spark.sql.Row = [TDigestSQL(TDigest(0.5,20,19,TDigestMap(-9.0 -> (51.0, 51.0),...
78+
firstA: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@782b0d37)]
79+
80+
scala> val sample1 = Vector.fill(10) { first1.getAs[TDigestSQL](0).tdigest.sample }
81+
sample1: scala.collection.immutable.Vector[Double] = Vector(0.0, 7.0, 9.0, 6.0, 1.0, 3.0, 4.0, 0.0, 9.0, 0.0)
82+
83+
scala> val sampleA = firstA.getAs[TDigestArraySQL](0).tdigests.map(_.sample)
84+
sampleA: Array[Double] = Array(0.5079398036724695, 0.7518583956493221, -0.054376728126603546, 0.7141623682043323, 0.4788564991204228)
85+
```
86+
87+
### Sketch a column of ML Vector
88+
```scala
89+
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
90+
import org.isarnproject.sketches._
91+
import org.isarnproject.sketches.udaf._
92+
import org.apache.spark.isarnproject.sketches.udt._
93+
94+
scala> import org.apache.spark.ml.linalg.Vectors
95+
import org.apache.spark.ml.linalg.Vectors
96+
97+
scala> import scala.util.Random._
98+
import scala.util.Random._
99+
100+
scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))})
101+
data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]
102+
103+
scala> val udafV = tdigestMLVecUDAF
104+
udafV: org.isarnproject.sketches.udaf.TDigestMLVecUDAF = TDigestMLVecUDAF(0.5,0)
105+
106+
scala> val firstV = data.agg(udafV($"_2")).first
107+
firstV: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@42b579cd)]
108+
109+
scala> val sampleV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.sample)
110+
sampleV: Array[Double] = Array(1.815862652134914, 0.24668895676164276, 0.09236479932949887)
111+
112+
scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5))
113+
medianV: Array[Double] = Array(-0.049806905959001196, -0.08528817932077674, -0.05291800642695017)
114+
```
115+
116+
### Sketch a column of MLLib Vector
117+
```scala
118+
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
119+
import org.isarnproject.sketches._
120+
import org.isarnproject.sketches.udaf._
121+
import org.apache.spark.isarnproject.sketches.udt._
122+
123+
scala> import org.apache.spark.mllib.linalg.Vectors
124+
import org.apache.spark.mllib.linalg.Vectors
125+
126+
scala> import scala.util.Random._
127+
import scala.util.Random._
128+
129+
scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))})
130+
data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]
131+
132+
scala> val udafV = tdigestMLLibVecUDAF
133+
udafV: org.isarnproject.sketches.udaf.TDigestMLLibVecUDAF = TDigestMLLibVecUDAF(0.5,0)
134+
135+
scala> val firstV = data.agg(udafV($"_2")).first
136+
firstV: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@6bffea90)]
137+
138+
scala> val sampleV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.sample)
139+
sampleV: Array[Double] = Array(0.10298190759496548, -0.1968752746464183, -1.0139250851274562)
140+
141+
scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5))
142+
medianV: Array[Double] = Array(0.025820266848484798, 0.01951778217339037, 0.09701138847692858)
143+
```

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ organization := "org.isarnproject"
44

55
bintrayOrganization := Some("isarn")
66

7-
version := "0.1.0.rc1"
7+
version := "0.1.0"
88

99
scalaVersion := "2.11.8"
1010

@@ -31,7 +31,7 @@ def commonSettings = Seq(
3131
|import org.apache.spark.isarnproject.sketches.udt._
3232
|val initialConf = new SparkConf().setAppName("repl").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "16mb")
3333
|val spark = SparkSession.builder.config(initialConf).master("local[2]").getOrCreate()
34-
|import spark._
34+
|import spark._, spark.implicits._
3535
|val sc = spark.sparkContext
3636
|import org.apache.log4j.{Logger, ConsoleAppender, Level}
3737
|Logger.getRootLogger().getAppender("console").asInstanceOf[ConsoleAppender].setThreshold(Level.WARN)

src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,20 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArra
2020
import org.isarnproject.sketches.TDigest
2121
import org.isarnproject.sketches.tdmap.TDigestMap
2222

23+
/** A type for receiving the results of deserializing [[TDigestUDT]].
24+
* The payload is the tdigest member field, holding a TDigest object.
25+
* This is necessary because (a) I define the TDigest type is defined in the isarn-sketches
26+
* package and I do not
27+
* want any Spark dependencies on that package, and (b) Spark has privatized UserDefinedType under
28+
* org.apache.spark scope, and so I have to have a paired result type in the same scope.
29+
* @param tdigest The TDigest payload, which does the actual sketching.
30+
*/
2331
@SQLUserDefinedType(udt = classOf[TDigestUDT])
2432
case class TDigestSQL(tdigest: TDigest)
2533

34+
/** A UserDefinedType for serializing and deserializing [[TDigestSQL]] structures during UDAF
35+
* aggregations.
36+
*/
2637
class TDigestUDT extends UserDefinedType[TDigestSQL] {
2738
def userClass: Class[TDigestSQL] = classOf[TDigestSQL]
2839

@@ -66,11 +77,21 @@ class TDigestUDT extends UserDefinedType[TDigestSQL] {
6677
}
6778
}
6879

80+
/** Instantiated instance of [[TDigestUDT]] for use by UDAF objects */
6981
case object TDigestUDT extends TDigestUDT
7082

83+
/** A type for receiving the results of deserializing [[TDigestArrayUDT]].
84+
* The payload is the tdigests member field, holding an Array of TDigest objects.
85+
* @param tdigests An array of TDigest objects, which do the actual sketching.
86+
* @see [[TDigestSQL]] for additional context
87+
*/
7188
@SQLUserDefinedType(udt = classOf[TDigestArrayUDT])
7289
case class TDigestArraySQL(tdigests: Array[TDigest])
7390

91+
/**
92+
* A UserDefinedType for serializing and deserializing [[TDigestArraySQL]] objects
93+
* during UDAF aggregations
94+
*/
7495
class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] {
7596
def userClass: Class[TDigestArraySQL] = classOf[TDigestArraySQL]
7697

@@ -124,9 +145,10 @@ class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] {
124145
}
125146
}
126147

148+
/** A [[TDigestArrayUDT]] instance for use in declaring UDAF objects */
127149
case object TDigestArrayUDT extends TDigestArrayUDT
128150

129-
// VectorUDT is private[spark], but I can expose what I need this way:
151+
/** Shims for exposing Spark's VectorUDT objects outside of org.apache.spark scope */
130152
object TDigestUDTInfra {
131153
private val udtML = new org.apache.spark.ml.linalg.VectorUDT
132154
def udtVectorML: DataType = udtML

src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,21 @@ import org.isarnproject.sketches.TDigest
2323

2424
import org.apache.spark.isarnproject.sketches.udt._
2525

26+
/**
27+
* A UDAF for sketching numeric data with a TDigest.
28+
* Expected to be created using [[tdigestUDAF]].
29+
* @tparam N the expected numeric type of the data; Double, Int, etc
30+
* @param deltaV The delta value to be used by the TDigest object
31+
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest object
32+
*/
2633
case class TDigestUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
2734
num: Numeric[N],
2835
dataTpe: TDigestUDAFDataType[N]) extends UserDefinedAggregateFunction {
2936

37+
/** customize the delta value to be used by the TDigest object */
3038
def delta(deltaP: Double) = this.copy(deltaV = deltaP)
3139

40+
/** customize the maxDiscrete value to be used by the TDigest object */
3241
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)
3342

3443
// A t-digest is deterministic, but it is only statistically associative or commutative
@@ -59,6 +68,7 @@ case class TDigestUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
5968
def evaluate(buf: Row): Any = buf.getAs[TDigestSQL](0)
6069
}
6170

71+
/** A base class that defines the common functionality for array sketching UDAFs */
6272
abstract class TDigestMultiUDAF extends UserDefinedAggregateFunction {
6373
def deltaV: Double
6474
def maxDiscreteV: Int
@@ -90,11 +100,19 @@ abstract class TDigestMultiUDAF extends UserDefinedAggregateFunction {
90100
def evaluate(buf: Row): Any = buf.getAs[TDigestArraySQL](0)
91101
}
92102

103+
/**
104+
* A UDAF for sketching a column of ML Vectors with an array of TDigest objects.
105+
* Expected to be created using [[tdigestMLVecUDAF]].
106+
* @param deltaV The delta value to be used by the TDigest object
107+
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest object
108+
*/
93109
case class TDigestMLVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMultiUDAF {
94110
import org.apache.spark.ml.linalg.{ Vector => Vec }
95111

112+
/** customize the delta value to be used by the TDigest object */
96113
def delta(deltaP: Double) = this.copy(deltaV = deltaP)
97114

115+
/** customize the maxDiscrete value to be used by the TDigest object */
98116
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)
99117

100118
def inputSchema: StructType = StructType(StructField("vector", TDigestUDTInfra.udtVectorML) :: Nil)
@@ -124,11 +142,19 @@ case class TDigestMLVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMu
124142
}
125143
}
126144

145+
/**
146+
* A UDAF for sketching a column of MLLib Vectors with an array of TDigest objects.
147+
* Expected to be created using [[tdigestMLLibVecUDAF]].
148+
* @param deltaV The delta value to be used by the TDigest object
149+
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest object
150+
*/
127151
case class TDigestMLLibVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMultiUDAF {
128152
import org.apache.spark.mllib.linalg.{ Vector => Vec }
129153

154+
/** customize the delta value to be used by the TDigest object */
130155
def delta(deltaP: Double) = this.copy(deltaV = deltaP)
131156

157+
/** customize the maxDiscrete value to be used by the TDigest object */
132158
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)
133159

134160
def inputSchema: StructType =
@@ -159,12 +185,21 @@ case class TDigestMLLibVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDiges
159185
}
160186
}
161187

188+
/**
189+
* A UDAF for sketching a column of numeric ArrayData with an array of TDigest objects.
190+
* Expected to be created using [[tdigestArrayUDAF]].
191+
* @tparam N the expected numeric type of the data; Double, Int, etc
192+
* @param deltaV The delta value to be used by the TDigest objects
193+
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest objects
194+
*/
162195
case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
163196
num: Numeric[N],
164197
dataTpe: TDigestUDAFDataType[N]) extends TDigestMultiUDAF {
165198

199+
/** customize the delta value to be used by the TDigest object */
166200
def delta(deltaP: Double) = this.copy(deltaV = deltaP)
167201

202+
/** customize the maxDiscrete value to be used by the TDigest object */
168203
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)
169204

170205
def inputSchema: StructType =
@@ -183,6 +218,7 @@ case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
183218
if (x != null) tdigests(j) += num.toDouble(x)
184219
j += 1
185220
}
221+
buf(0) = TDigestArraySQL(tdigests)
186222
}
187223
}
188224
}

0 commit comments

Comments
 (0)