Skip to content

Commit 63b567b

Browse files
Feature: Row Level Results (#451)
* Demo implementation of returning row-level results from metrics * Row-level results from VerificationResult * Row-level results from VerificationResult * Fix some tests by expecting a full result column * Fix Deequ tests to expect full Completeness result * Checks can return row-level result column names, if any * Make Analyzer and Constraint classes serializable explicitly * Refactor tests * Move row-level management to trait * MaxLength analyzer returns length of each record * Refactor VerificationResult to correctly match Metrics to Analyzers * VerificationResult aggregates all columns for a check * Return row-level results for two constraints * Improve naming and comments --------- Co-authored-by: Yannis Mentekidis <mentekid@amazon.com>
1 parent ef4c308 commit 63b567b

22 files changed

+453
-49
lines changed

deequ-scalastyle.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
<check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
1818
<parameters>
19-
<parameter name="maxLineLength"><![CDATA[100]]></parameter>
19+
<parameter name="maxLineLength"><![CDATA[120]]></parameter>
2020
<parameter name="tabSize"><![CDATA[2]]></parameter>
2121
<parameter name="ignoreImports">true</parameter>
2222
</parameters>

src/main/scala/com/amazon/deequ/VerificationResult.scala

+61
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@ package com.amazon.deequ
1919
import com.amazon.deequ.analyzers.Analyzer
2020
import com.amazon.deequ.analyzers.runners.AnalyzerContext
2121
import com.amazon.deequ.checks.{Check, CheckResult, CheckStatus}
22+
import com.amazon.deequ.constraints.AnalysisBasedConstraint
23+
import com.amazon.deequ.constraints.ConstraintResult
24+
import com.amazon.deequ.constraints.NamedConstraint
25+
import com.amazon.deequ.constraints.RowLevelAssertedConstraint
26+
import com.amazon.deequ.constraints.RowLevelConstraint
27+
import com.amazon.deequ.metrics.FullColumn
2228
import com.amazon.deequ.metrics.Metric
2329
import com.amazon.deequ.repository.SimpleResultSerde
30+
import org.apache.spark.sql.Column
2431
import org.apache.spark.sql.{DataFrame, SparkSession}
2532

2633
/**
@@ -70,6 +77,23 @@ object VerificationResult {
7077
"constraint_status", "constraint_message")
7178
}
7279

80+
/**
81+
* For each check in the verification suite, adds a column of row-level results
82+
* to the input data if that check contains a column.
83+
*
84+
* Accepts a naming rule
85+
*/
86+
def rowLevelResultsAsDataFrame(
87+
sparkSession: SparkSession,
88+
verificationResult: VerificationResult,
89+
data: DataFrame): DataFrame = {
90+
91+
val columnNamesToMetrics: Map[String, Column] = verificationResultToColumn(verificationResult)
92+
93+
columnNamesToMetrics.foldLeft(data)(
94+
(data, newColumn: (String, Column)) => data.withColumn(newColumn._1, newColumn._2))
95+
}
96+
7397
def checkResultsAsJson(verificationResult: VerificationResult,
7498
forChecks: Seq[Check] = Seq.empty): String = {
7599

@@ -90,6 +114,43 @@ object VerificationResult {
90114
SimpleResultSerde.serialize(checkResults)
91115
}
92116

117+
/**
118+
* Returns a column for each check whose values are the result of each of the check's constraints
119+
*/
120+
private def verificationResultToColumn(verificationResult: VerificationResult): Map[String, Column] = {
121+
verificationResult.checkResults.flatMap(pair => columnForCheckResult(pair._1, pair._2))
122+
}
123+
124+
private def columnForCheckResult(check: Check, checkResult: CheckResult): Option[(String, Column)] = {
125+
// Convert non-boolean columns to boolean by using the assertion
126+
127+
val metrics: Seq[Column] = checkResult.constraintResults.flatMap(constraintResultToColumn)
128+
if (metrics.isEmpty) {
129+
None
130+
} else {
131+
Some(check.description, metrics.reduce(_ and _))
132+
}
133+
}
134+
135+
private def constraintResultToColumn(constraintResult: ConstraintResult): Option[Column] = {
136+
val constraint = constraintResult.constraint
137+
constraint match {
138+
case asserted: RowLevelAssertedConstraint =>
139+
constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_))
140+
case _: RowLevelConstraint =>
141+
constraintResult.metric.flatMap(metricToColumn)
142+
case _ => None
143+
}
144+
}
145+
146+
private def metricToColumn(metric: Metric[_]): Option[Column] = {
147+
metric match {
148+
case fullColumn: FullColumn => fullColumn.fullColumn
149+
case _ => None
150+
}
151+
}
152+
153+
93154
private[this] def getSimplifiedCheckResultOutput(
94155
verificationResult: VerificationResult)
95156
: Seq[SimpleCheckResultOutput] = {

src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala

+13-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.functions._
2222
import org.apache.spark.sql.types._
2323
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
2424
import com.amazon.deequ.analyzers.runners._
25+
import com.amazon.deequ.metrics.FullColumn
2526

2627
import scala.language.existentials
2728
import scala.util.{Failure, Success}
@@ -53,7 +54,7 @@ trait DoubleValuedState[S <: DoubleValuedState[S]] extends State[S] {
5354
}
5455

5556
/** Common trait for all analyzers which generates metrics from states computed on data frames */
56-
trait Analyzer[S <: State[_], +M <: Metric[_]] {
57+
trait Analyzer[S <: State[_], +M <: Metric[_]] extends Serializable {
5758

5859
/**
5960
* Compute the state (sufficient statistics) from the data
@@ -206,7 +207,11 @@ abstract class StandardScanShareableAnalyzer[S <: DoubleValuedState[_]](
206207
override def computeMetricFrom(state: Option[S]): DoubleMetric = {
207208
state match {
208209
case Some(theState) =>
209-
metricFromValue(theState.metricValue(), name, instance, entity)
210+
val col = theState match {
211+
case withColumn: FullColumn => withColumn.fullColumn
212+
case _ => None
213+
}
214+
metricFromValue(theState.metricValue(), name, instance, entity, col)
210215
case _ =>
211216
metricFromEmpty(this, name, instance, entity)
212217
}
@@ -227,11 +232,11 @@ abstract class StandardScanShareableAnalyzer[S <: DoubleValuedState[_]](
227232

228233
/** A state for computing ratio-based metrics,
229234
* contains #rows that match a predicate and overall #rows */
230-
case class NumMatchesAndCount(numMatches: Long, count: Long)
231-
extends DoubleValuedState[NumMatchesAndCount] {
235+
case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullColumn: Option[Column] = None)
236+
extends DoubleValuedState[NumMatchesAndCount] with FullColumn {
232237

233238
override def sum(other: NumMatchesAndCount): NumMatchesAndCount = {
234-
NumMatchesAndCount(numMatches + other.numMatches, count + other.count)
239+
NumMatchesAndCount(numMatches + other.numMatches, count + other.count, sum(fullColumn, other.fullColumn))
235240
}
236241

237242
override def metricValue(): Double = {
@@ -472,10 +477,11 @@ private[deequ] object Analyzers {
472477
value: Double,
473478
name: String,
474479
instance: String,
475-
entity: Entity.Value = Entity.Column)
480+
entity: Entity.Value = Entity.Column,
481+
fullColumn: Option[Column] = None)
476482
: DoubleMetric = {
477483

478-
DoubleMetric(entity, name, instance, Success(value))
484+
DoubleMetric(entity, name, instance, Success(value), fullColumn)
479485
}
480486

481487
def emptyStateException(analyzer: Analyzer[_, _]): EmptyStateException = {

src/main/scala/com/amazon/deequ/analyzers/Completeness.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNotNested}
2020
import org.apache.spark.sql.functions.sum
2121
import org.apache.spark.sql.types.{IntegerType, StructType}
2222
import Analyzers._
23+
import com.google.common.annotations.VisibleForTesting
24+
import org.apache.spark.sql.DataFrame
2325
import org.apache.spark.sql.{Column, Row}
2426

2527
/** Completeness is the fraction of non-null values in a column of a DataFrame. */
@@ -30,13 +32,13 @@ case class Completeness(column: String, where: Option[String] = None) extends
3032
override def fromAggregationResult(result: Row, offset: Int): Option[NumMatchesAndCount] = {
3133

3234
ifNoNullsIn(result, offset, howMany = 2) { _ =>
33-
NumMatchesAndCount(result.getLong(offset), result.getLong(offset + 1))
35+
NumMatchesAndCount(result.getLong(offset), result.getLong(offset + 1), Some(criterion))
3436
}
3537
}
3638

3739
override def aggregationFunctions(): Seq[Column] = {
3840

39-
val summation = sum(conditionalSelection(column, where).isNotNull.cast(IntegerType))
41+
val summation = sum(criterion.cast(IntegerType))
4042

4143
summation :: conditionalCount(where) :: Nil
4244
}
@@ -46,4 +48,7 @@ case class Completeness(column: String, where: Option[String] = None) extends
4648
}
4749

4850
override def filterCondition: Option[String] = where
51+
52+
@VisibleForTesting // required by some tests that compare analyzer results to an expected state
53+
private[deequ] def criterion: Column = conditionalSelection(column, where).isNotNull
4954
}

src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ case class MaxLength(column: String, where: Option[String] = None)
2727
with FilterableAnalyzer {
2828

2929
override def aggregationFunctions(): Seq[Column] = {
30-
max(length(conditionalSelection(column, where))).cast(DoubleType) :: Nil
30+
max(criterion) :: Nil
3131
}
3232

3333
override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = {
3434
ifNoNullsIn(result, offset) { _ =>
35-
MaxState(result.getDouble(offset))
35+
MaxState(result.getDouble(offset), Some(criterion))
3636
}
3737
}
3838

@@ -41,4 +41,6 @@ case class MaxLength(column: String, where: Option[String] = None)
4141
}
4242

4343
override def filterCondition: Option[String] = where
44+
45+
private def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType)
4446
}

src/main/scala/com/amazon/deequ/analyzers/Maximum.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import org.apache.spark.sql.{Column, Row}
2121
import org.apache.spark.sql.functions.max
2222
import org.apache.spark.sql.types.{DoubleType, StructType}
2323
import Analyzers._
24+
import com.amazon.deequ.metrics.FullColumn
2425

25-
case class MaxState(maxValue: Double) extends DoubleValuedState[MaxState] {
26+
case class MaxState(maxValue: Double, override val fullColumn: Option[Column] = None)
27+
extends DoubleValuedState[MaxState] with FullColumn {
2628

2729
override def sum(other: MaxState): MaxState = {
28-
MaxState(math.max(maxValue, other.maxValue))
30+
MaxState(math.max(maxValue, other.maxValue), sum(fullColumn, other.fullColumn))
2931
}
3032

3133
override def metricValue(): Double = {

src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.spark.sql
1818

1919

20-
import com.amazon.deequ.analyzers.KLLSketch
2120
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, StatefulApproxQuantile, StatefulHyperloglogPlus}
2221
import org.apache.spark.sql.catalyst.expressions.Literal
2322

src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ object AnalysisRunner {
7979
}
8080

8181
/**
82-
* Compute the metrics from the analyzers configured in the analyis
82+
* Compute the metrics from the analyzers configured in the analysis
8383
*
8484
* @param data data on which to operate
8585
* @param analyzers the analyzers to run
@@ -169,7 +169,7 @@ object AnalysisRunner {
169169
// TODO this can be further improved, we can get the number of rows from other metrics as well
170170
// TODO we could also insert an extra Size() computation if we have to scan the data anyways
171171
var numRowsOfData = nonGroupedMetrics.metric(Size()).collect {
172-
case DoubleMetric(_, _, _, Success(value: Double)) => value.toLong
172+
case DoubleMetric(_, _, _, Success(value: Double), None) => value.toLong
173173
}
174174

175175
var groupedMetrics = AnalyzerContext.empty

src/main/scala/com/amazon/deequ/checks/Check.scala

+12
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ case class Check(
6363
description: String,
6464
private[deequ] val constraints: Seq[Constraint] = Seq.empty) {
6565

66+
/**
67+
* Returns the name of the columns where each Constraint puts row-level results, if any
68+
*
69+
*/
70+
def getRowLevelConstraintColumnNames(): Seq[String] = {
71+
constraints.flatMap(c => {
72+
c match {
73+
case c: RowLevelConstraint => Some(c.getColumnName)
74+
case _ => None
75+
}
76+
})
77+
}
6678

6779
/**
6880
* Returns a new Check object with the given constraint added to the constraints list.

src/main/scala/com/amazon/deequ/constraints/Constraint.scala

+28-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ case class ConstraintResult(
3333
metric: Option[Metric[_]] = None)
3434

3535
/** Common trait for all data quality constraints */
36-
trait Constraint {
36+
trait Constraint extends Serializable {
3737
def evaluate(analysisResults: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult
3838
}
3939

@@ -68,6 +68,25 @@ class NamedConstraint(private[deequ] val constraint: Constraint, name: String)
6868
override def toString(): String = name
6969
}
7070

71+
/**
72+
* Constraint decorator which holds a name of the constraint and a name for the column-level result
73+
*
74+
* @param constraint Delegate
75+
* @param name Name (Detailed message) for the constraint
76+
* @param columnName Name for the column containing row-level results for this constraint
77+
*/
78+
class RowLevelConstraint(private[deequ] override val constraint: Constraint, name: String, columnName: String)
79+
extends NamedConstraint(constraint, name) {
80+
val getColumnName: String = columnName
81+
}
82+
83+
class RowLevelAssertedConstraint(private[deequ] override val constraint: Constraint,
84+
name: String,
85+
columnName: String,
86+
val assertion: UserDefinedFunction)
87+
extends RowLevelConstraint(constraint, name, columnName) {
88+
}
89+
7190
/**
7291
* Companion object to create constraint objects
7392
* These methods can be used from the unit tests or during creation of Check configuration
@@ -170,7 +189,7 @@ object Constraint {
170189
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
171190
completeness, assertion, hint = hint)
172191

173-
new NamedConstraint(constraint, s"CompletenessConstraint($completeness)")
192+
new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-$column")
174193
}
175194

176195
/**
@@ -414,7 +433,13 @@ object Constraint {
414433
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion,
415434
hint = hint)
416435

417-
new NamedConstraint(constraint, s"MaxLengthConstraint($maxLength)")
436+
val sparkAssertion = org.apache.spark.sql.functions.udf(assertion)
437+
438+
new RowLevelAssertedConstraint(
439+
constraint,
440+
s"MaxLengthConstraint($maxLength)",
441+
s"ColumnLength-$column",
442+
sparkAssertion)
418443
}
419444

420445
/**

src/main/scala/com/amazon/deequ/metrics/Metric.scala

+28-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.amazon.deequ.metrics
1818

19+
import org.apache.spark.sql.Column
20+
1921
import scala.util.{Failure, Success, Try}
2022

2123
object Entity extends Enumeration {
@@ -37,13 +39,34 @@ trait Metric[T] {
3739
def flatten(): Seq[DoubleMetric]
3840
}
3941

42+
/**
43+
* Full-column metrics store the entire column of row-level pass/fail results
44+
*/
45+
trait FullColumn {
46+
val fullColumn: Option[Column] = None
47+
48+
/**
49+
* State::sum is used to combine two states, e.g. when the same analyzer has run on two parts
50+
* of a dataset and then the states are combined to produce the state for the entire dataset.
51+
* For FullColumn analyzers, their sum implementation should invoke this sum method to
52+
* combine the columns.
53+
*
54+
* As Column is a Spark expression of a transformation on data, rather than the data itself,
55+
* the sum of two Spark columns whose expression equal to each other is the expression.
56+
* The sum of two different Spark columns is not defined, so an empty Option is returned.
57+
*/
58+
def sum(colA: Option[Column], colB: Option[Column]): Option[Column] =
59+
if (colA.equals(colB)) colA else None
60+
}
61+
4062
/** Common trait for all data quality metrics where the value is double */
4163
case class DoubleMetric(
42-
entity: Entity.Value,
43-
name: String,
44-
instance: String,
45-
value: Try[Double])
46-
extends Metric[Double] {
64+
entity: Entity.Value,
65+
name: String,
66+
instance: String,
67+
value: Try[Double],
68+
override val fullColumn: Option[Column] = None)
69+
extends Metric[Double] with FullColumn {
4770

4871
override def flatten(): Seq[DoubleMetric] = Seq(this)
4972
}

0 commit comments

Comments
 (0)