Skip to content

Commit d2551bc

Browse files
rdsharma26fergonp
andauthored
[Experimental] Addition of dataset comparison utilities (#449)
* Referential Integrity check and test, with Data Synchronization Check and Test * remove .DS_Store files * Cleaner versions of Referential Integrity and Data Synchronization checks and tests. * save save * Newest version of my three checks * Version for code review, for all of my checks * Final code review * Pull request version of my code * Pull request version of my code * Final Version Pull Request * remove .DS_Store files Duplicate * .DS_Store banished! * Removing * Removings * Delete DS_Stores * Cleanup: Update parameter names, descriptions, remove unnecessary whitespace etc. * Changes in accordance with comments on the pull request. - Added documentation for Data Sync APIs, with examples. - Updated the error messaging, based on different scenarios. --------- Co-authored-by: Fernan Gonzalez <fergonp@amazon.com>
1 parent 63b567b commit d2551bc

File tree

6 files changed

+966
-0
lines changed

6 files changed

+966
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ target/
55
.metals/
66
.vscode/
77
.bloop/
8+
.DS_Store
9+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.comparison
18+
19+
sealed trait ComparisonResult
20+
case class ComparisonFailed(errorMessage: String) extends ComparisonResult
21+
case class ComparisonSucceeded() extends ComparisonResult
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/**
2+
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.comparison
18+
19+
import org.apache.spark.sql.{Column, DataFrame}
20+
import org.apache.spark.sql.functions.col
21+
22+
/**
23+
* Compare two DataFrames 1 to 1 with specific columns inputted by the customer.
24+
*
25+
* This is an experimental utility.
26+
*
27+
* For example, consider the two dataframes below:
28+
*
29+
* DataFrame A:
30+
*
31+
* |--ID--|---City---|--State--|
32+
* | 1 | New York | NY |
33+
* | 2 | Chicago | IL |
34+
* | 3 | Boston | MA |
35+
*
36+
* DataFrame B:
37+
*
38+
* |--CityID--|---City---|-----State-----|
39+
* | 1 | New York | New York |
40+
* | 2 | Chicago | Illinois |
41+
* | 3 | Boston | Massachusetts |
42+
*
43+
* Note that dataframe B is almost equal to dataframe B, but for two things:
44+
* 1) The ID column in B is called CityID
45+
* 2) The State column in B is the full name, whereas A uses the abbreviation.
46+
*
47+
* To compare A with B, for just the City column, we can use the function like the following.
48+
*
49+
* DataSynchronization.columnMatch(
50+
* ds1 = dfA,
51+
* ds2 = dfB,
52+
* colKeyMap = Map("ID" -> "CityID"), // Mapping for the key columns
53+
* compCols = Map("City" -> "City"), // Mapping for the columns that should be compared
54+
* assertion = _ > 0.8
55+
* )
56+
*
57+
* This will evaluate to true since the City column matches in A and B for the corresponding ID.
58+
*
59+
* To compare A with B, for all columns, we can use the function like the following.
60+
*
61+
* DataSynchronization.columnMatch(
62+
* ds1 = dfA,
63+
* ds2 = dfB,
64+
* colKeyMap = Map("ID" -> "CityID"), // Mapping for the key columns
65+
* assertion = _ > 0.8
66+
* )
67+
*
68+
* This will evaluate to false. The city column will match, but the state column will not.
69+
*/
70+
71+
object DataSynchronization {
72+
/**
73+
* This will evaluate to false. The city column will match, but the state column will not.
74+
*
75+
* @param ds1 The first data set which the customer will select for comparison.
76+
* @param ds2 The second data set which the customer will select for comparison.
77+
* @param colKeyMap A map of columns to columns used for joining the two datasets.
78+
* The keys in the map are composite key forming columns from the first dataset.
79+
* The values for each key is the equivalent column from the second dataset.
80+
* @param assertion A function which accepts the match ratio and returns a Boolean.
81+
* @return ComparisonResult An appropriate subtype of ComparisonResult is returned.
82+
* Once all preconditions are met, we calculate the ratio of the rows
83+
* that match and we run the assertion on that outcome.
84+
* The response is then converted to ComparisonResult.
85+
*/
86+
def columnMatch(ds1: DataFrame,
87+
ds2: DataFrame,
88+
colKeyMap: Map[String, String],
89+
assertion: Double => Boolean): ComparisonResult = {
90+
if (areKeyColumnsValid(ds1, ds2, colKeyMap)) {
91+
val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
92+
val colsDS2 = ds2.columns.filterNot(x => colKeyMap.values.toSeq.contains(x)).sorted
93+
94+
if (!(colsDS1 sameElements colsDS2)) {
95+
ComparisonFailed("Non key columns in the given data frames do not match.")
96+
} else {
97+
val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap
98+
finalAssertion(ds1, ds2, mergedMaps, assertion)
99+
}
100+
} else {
101+
ComparisonFailed("Provided key map not suitable for given data frames.")
102+
}
103+
}
104+
105+
/**
106+
* This will evaluate to false. The city column will match, but the state column will not.
107+
*
108+
* @param ds1 The first data set which the customer will select for comparison.
109+
* @param ds2 The second data set which the customer will select for comparison.
110+
* @param colKeyMap A map of columns to columns used for joining the two datasets.
111+
* The keys in the map are composite key forming columns from the first dataset.
112+
* The values for each key is the equivalent column from the second dataset.
113+
* @param compCols A map of columns to columns which we will check for equality, post joining.
114+
* @param assertion A function which accepts the match ratio and returns a Boolean.
115+
* @return ComparisonResult An appropriate subtype of ComparisonResult is returned.
116+
* Once all preconditions are met, we calculate the ratio of the rows
117+
* that match and we run the assertion on that outcome.
118+
* The response is then converted to ComparisonResult.
119+
*/
120+
def columnMatch(ds1: DataFrame,
121+
ds2: DataFrame,
122+
colKeyMap: Map[String, String],
123+
compCols: Map[String, String],
124+
assertion: Double => Boolean): ComparisonResult = {
125+
if (areKeyColumnsValid(ds1, ds2, colKeyMap)) {
126+
val mergedMaps = colKeyMap ++ compCols
127+
finalAssertion(ds1, ds2, mergedMaps, assertion)
128+
} else {
129+
ComparisonFailed("Provided key map not suitable for given data frames.")
130+
}
131+
}
132+
133+
private def areKeyColumnsValid(ds1: DataFrame,
134+
ds2: DataFrame,
135+
colKeyMap: Map[String, String]): Boolean = {
136+
// We verify that the key columns provided form a valid primary/composite key.
137+
// To achieve this, we group the dataframes and compare their count with the original count.
138+
// If the key columns provided are valid, then the two columns should match.
139+
val ds1Unique = ds1.groupBy(colKeyMap.keys.toSeq.map(col): _*).count()
140+
val ds2Unique = ds2.groupBy(colKeyMap.values.toSeq.map(col): _*).count()
141+
(ds1Unique.count() == ds1.count()) && (ds2Unique.count() == ds2.count())
142+
}
143+
144+
private def finalAssertion(ds1: DataFrame,
145+
ds2: DataFrame,
146+
mergedMaps: Map[String, String],
147+
assertion: Double => Boolean): ComparisonResult = {
148+
149+
val ds1Count = ds1.count()
150+
val ds2Count = ds2.count()
151+
152+
if (ds1Count != ds2Count) {
153+
ComparisonFailed(s"The row counts of the two data frames do not match.")
154+
} else {
155+
val joinExpression: Column = mergedMaps
156+
.map { case (col1, col2) => ds1(col1) === ds2(col2)}
157+
.reduce((e1, e2) => e1 && e2)
158+
159+
val joined = ds1.join(ds2, joinExpression, "inner")
160+
val ratio = joined.count().toDouble / ds1Count
161+
162+
if (assertion(ratio)) {
163+
ComparisonSucceeded()
164+
} else {
165+
ComparisonFailed(s"Value: $ratio does not meet the constraint requirement.")
166+
}
167+
}
168+
}
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.comparison
18+
19+
import org.apache.spark.sql.DataFrame
20+
21+
object ReferentialIntegrity {
22+
23+
/**
24+
* Checks to what extent a column from a DataFrame is a subset of another column
25+
* from another DataFrame.
26+
*
27+
* This is an experimental utility.
28+
*
29+
* @param primary The primary data set which contains the column which the customer
30+
* will select the column to do the Referential Integrity check.
31+
* @param primaryCol The name of the column selected from the primary data set.
32+
* @param reference The reference data set which contains the possible values for the column
33+
* from the primary dataset.
34+
* @param referenceCol The name of the column selected from the reference data set, which
35+
* contains those values.
36+
* @param assertion A function which accepts the match ratio and returns a Boolean.
37+
*
38+
* @return Boolean Internally we calculate the referential integrity as a
39+
* ratio, and we run the assertion on that outcome
40+
* that ends up being a true or false response.
41+
*/
42+
43+
def subsetCheck(primary: DataFrame,
44+
primaryCol: String,
45+
reference: DataFrame,
46+
referenceCol: String,
47+
assertion: Double => Boolean): ComparisonResult = {
48+
val primaryCount = primary.count()
49+
50+
if (!primary.columns.contains(primaryCol)) {
51+
ComparisonFailed(s"Column $primaryCol does not exist in primary data frame.")
52+
} else if (!reference.columns.contains(referenceCol)) {
53+
ComparisonFailed(s"Column $referenceCol does not exist in reference data frame.")
54+
} else if (primaryCount == 0) {
55+
ComparisonFailed(s"Primary data frame contains no data.")
56+
} else {
57+
val primarySparkCol = primary.select(primaryCol)
58+
val referenceSparkCol = reference.select(referenceCol)
59+
val mismatchCount = primarySparkCol.except(referenceSparkCol).count()
60+
61+
val ratio = if (mismatchCount == 0) 1.0 else (primaryCount - mismatchCount).toDouble / primaryCount
62+
63+
if (assertion(ratio)) {
64+
ComparisonSucceeded()
65+
} else {
66+
ComparisonFailed(s"Value: $ratio does not meet the constraint requirement.")
67+
}
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)