Skip to content

Commit 79b0ef7

Browse files
authored
Verify that non key columns exist in each dataset (#517)
* Verify that non key columns exist in each dataset - The non existence of non key columns was resulting in a Spark SQL exception, instead of a graceful "ComparisonFailed" return value. - In a future PR, will consolidate all the column validation logic into one single place. * Fix failing build due to formatting issues.
1 parent 32e1155 commit 79b0ef7

File tree

2 files changed

+182
-27
lines changed

2 files changed

+182
-27
lines changed

src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala

+65-27
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ object DataSynchronization extends ComparisonBase {
9898
if (columnErrors.isEmpty) {
9999
// Get all the non-key columns from DS1 and verify that they are present in DS2
100100
val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
101-
val nonKeyColsMatch = colsDS1.forall { col => Try { ds2(col) }.isSuccess }
101+
val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _))
102102

103103
if (!nonKeyColsMatch) {
104104
ComparisonFailed("Non key columns in the given data frames do not match.")
@@ -131,12 +131,23 @@ object DataSynchronization extends ComparisonBase {
131131
colKeyMap: Map[String, String],
132132
compCols: Map[String, String],
133133
assertion: Double => Boolean): ComparisonResult = {
134-
val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
135-
if (columnErrors.isEmpty) {
136-
val mergedMaps = colKeyMap ++ compCols
137-
finalAssertion(ds1, ds2, mergedMaps, assertion)
134+
val keyColumnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
135+
if (keyColumnErrors.isEmpty) {
136+
val nonKeyColumns1NotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
137+
val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _))
138+
139+
if (nonKeyColumns1NotInDataset.nonEmpty) {
140+
ComparisonFailed(s"The following columns were not found in the first dataset: " +
141+
s"${nonKeyColumns1NotInDataset.mkString(", ")}")
142+
} else if (nonKeyColumns2NotInDataset.nonEmpty) {
143+
ComparisonFailed(s"The following columns were not found in the second dataset: " +
144+
s"${nonKeyColumns2NotInDataset.mkString(", ")}")
145+
} else {
146+
val mergedMaps = colKeyMap ++ compCols
147+
finalAssertion(ds1, ds2, mergedMaps, assertion)
148+
}
138149
} else {
139-
ComparisonFailed(columnErrors.get)
150+
ComparisonFailed(keyColumnErrors.get)
140151
}
141152
}
142153

@@ -150,12 +161,27 @@ object DataSynchronization extends ComparisonBase {
150161
val compColsEither: Either[ComparisonFailed, Map[String, String]] = if (optionalCompCols.isDefined) {
151162
optionalCompCols.get match {
152163
case compCols if compCols.isEmpty => Left(ComparisonFailed("Empty column comparison map provided."))
153-
case compCols => Right(compCols)
164+
case compCols =>
165+
val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
166+
val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _))
167+
if (ds1CompColsNotInDataset.nonEmpty) {
168+
Left(
169+
ComparisonFailed(s"The following columns were not found in the first dataset: " +
170+
s"${ds1CompColsNotInDataset.mkString(", ")}")
171+
)
172+
} else if (ds2CompColsNotInDataset.nonEmpty) {
173+
Left(
174+
ComparisonFailed(s"The following columns were not found in the second dataset: " +
175+
s"${ds2CompColsNotInDataset.mkString(", ")}")
176+
)
177+
} else {
178+
Right(compCols)
179+
}
154180
}
155181
} else {
156182
// Get all the non-key columns from DS1 and verify that they are present in DS2
157183
val ds1NonKeyCols = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
158-
val nonKeyColsMatch = ds1NonKeyCols.forall { col => Try { ds2(col) }.isSuccess }
184+
val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _))
159185

160186
if (!nonKeyColsMatch) {
161187
Left(ComparisonFailed("Non key columns in the given data frames do not match."))
@@ -181,30 +207,40 @@ object DataSynchronization extends ComparisonBase {
181207
private def areKeyColumnsValid(ds1: DataFrame,
182208
ds2: DataFrame,
183209
colKeyMap: Map[String, String]): Option[String] = {
184-
// We verify that the key columns provided form a valid primary/composite key.
185-
// To achieve this, we group the dataframes and compare their count with the original count.
186-
// If the key columns provided are valid, then the two counts should match.
187210
val ds1Cols = colKeyMap.keys.toSeq
188211
val ds2Cols = colKeyMap.values.toSeq
189-
val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count()
190-
val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count()
191212

192-
val ds1Count = ds1.count()
193-
val ds2Count = ds2.count()
194-
val ds1UniqueCount = ds1Unique.count()
195-
val ds2UniqueCount = ds2Unique.count()
213+
val ds1ColsNotInDataset = ds1Cols.filterNot(columnExists(ds1, _))
214+
val ds2ColsNotInDataset = ds2Cols.filterNot(columnExists(ds2, _))
196215

197-
if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) {
198-
None
216+
if (ds1ColsNotInDataset.nonEmpty) {
217+
Some(s"The following key columns were not found in the first dataset: ${ds1ColsNotInDataset.mkString(", ")}")
218+
} else if (ds2ColsNotInDataset.nonEmpty) {
219+
Some(s"The following key columns were not found in the second dataset: ${ds2ColsNotInDataset.mkString(", ")}")
199220
} else {
200-
val combo1 = ds1Cols.mkString(", ")
201-
val combo2 = ds2Cols.mkString(", ")
202-
Some(s"The selected columns are not comparable due to duplicates present in the dataset." +
203-
s"Comparison keys must be unique, but " +
204-
s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," +
205-
s" and " +
206-
s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " +
207-
s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2")
221+
// We verify that the key columns provided form a valid primary/composite key.
222+
// To achieve this, we group the dataframes and compare their count with the original count.
223+
// If the key columns provided are valid, then the two counts should match.
224+
val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count()
225+
val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count()
226+
227+
val ds1Count = ds1.count()
228+
val ds2Count = ds2.count()
229+
val ds1UniqueCount = ds1Unique.count()
230+
val ds2UniqueCount = ds2Unique.count()
231+
232+
if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) {
233+
None
234+
} else {
235+
val combo1 = ds1Cols.mkString(", ")
236+
val combo2 = ds2Cols.mkString(", ")
237+
Some(s"The selected columns are not comparable due to duplicates present in the dataset." +
238+
s"Comparison keys must be unique, but " +
239+
s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," +
240+
s" and " +
241+
s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " +
242+
s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2")
243+
}
208244
}
209245
}
210246

@@ -291,4 +327,6 @@ object DataSynchronization extends ComparisonBase {
291327
.drop(ds2HashColName)
292328
.drop(ds2KeyColsUpdatedNamesMap.values.toSeq: _*)
293329
}
330+
331+
private def columnExists(df: DataFrame, col: String) = Try { df(col) }.isSuccess
294332
}

src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala

+117
Original file line numberDiff line numberDiff line change
@@ -686,5 +686,122 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec {
686686
val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
687687
assert(expected == rowLevelResults)
688688
}
689+
690+
"fails as expected when key columns do not exist" in withSparkSession { spark =>
691+
val idColumnName = "id"
692+
val ds1 = primaryDataset(spark, idColumnName)
693+
val ds2 = referenceDataset(spark, idColumnName)
694+
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
695+
696+
val nonExistCol1 = "foo"
697+
val nonExistCol2 = "bar"
698+
699+
// Key columns not in either dataset (Overall)
700+
val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2)
701+
val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion)
702+
703+
assert(overallResult1.isInstanceOf[ComparisonFailed])
704+
val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed]
705+
assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset"))
706+
assert(failedOverallResult1.errorMessage.contains(nonExistCol1))
707+
708+
// Key columns not in either dataset (Row level)
709+
val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap1)
710+
assert(rowLevelResult1.isLeft)
711+
val failedRowLevelResult1 = rowLevelResult1.left.get
712+
assert(failedRowLevelResult1.errorMessage.contains("key columns were not found in the first dataset"))
713+
assert(failedRowLevelResult1.errorMessage.contains(nonExistCol1))
714+
715+
// Key column not in first dataset
716+
val colKeyMap2 = Map(nonExistCol1 -> idColumnName)
717+
val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion)
718+
719+
assert(overallResult2.isInstanceOf[ComparisonFailed])
720+
val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed]
721+
assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset"))
722+
assert(failedOverallResult2.errorMessage.contains(nonExistCol1))
723+
724+
// Key column not in first dataset (Row level)
725+
val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap2)
726+
assert(rowLevelResult2.isLeft)
727+
val failedRowLevelResult2 = rowLevelResult2.left.get
728+
assert(failedRowLevelResult2.errorMessage.contains("key columns were not found in the first dataset"))
729+
assert(failedRowLevelResult2.errorMessage.contains(nonExistCol1))
730+
731+
// Key column not in second dataset
732+
val colKeyMap3 = Map(idColumnName -> nonExistCol2)
733+
val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion)
734+
735+
assert(overallResult3.isInstanceOf[ComparisonFailed])
736+
val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed]
737+
assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset"))
738+
assert(failedOverallResult3.errorMessage.contains(nonExistCol2))
739+
740+
// Key column not in second dataset (Row level)
741+
val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap3)
742+
assert(rowLevelResult3.isLeft)
743+
val failedRowLevelResult3 = rowLevelResult3.left.get
744+
assert(failedRowLevelResult3.errorMessage.contains("key columns were not found in the second dataset"))
745+
assert(failedRowLevelResult3.errorMessage.contains(nonExistCol2))
746+
}
747+
748+
"fails as expected when non-key columns do not exist" in withSparkSession { spark =>
749+
val idColumnName = "id"
750+
val ds1 = primaryDataset(spark, idColumnName)
751+
val ds2 = referenceDataset(spark, idColumnName)
752+
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
753+
val colKeyMap = Map(idColumnName -> idColumnName)
754+
755+
val nonExistCol1 = "foo"
756+
val nonExistCol2 = "bar"
757+
758+
// Non-key columns not in either dataset (Overall)
759+
val compColsMap1 = Map(nonExistCol1 -> nonExistCol2)
760+
val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion)
761+
762+
assert(overallResult1.isInstanceOf[ComparisonFailed])
763+
val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed]
764+
assert(failedOverallResult1.errorMessage.contains(
765+
s"The following columns were not found in the first dataset: $nonExistCol1"))
766+
767+
// Non-key columns not in either dataset (Row level)
768+
val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap1))
769+
assert(rowLevelResult1.isLeft)
770+
val failedRowLevelResult1 = rowLevelResult1.left.get
771+
assert(failedRowLevelResult1.errorMessage.contains(
772+
s"The following columns were not found in the first dataset: $nonExistCol1"))
773+
774+
// Non-key column not in first dataset
775+
val compColsMap2 = Map(nonExistCol1 -> "State")
776+
val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion)
777+
778+
assert(overallResult2.isInstanceOf[ComparisonFailed])
779+
val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed]
780+
assert(failedOverallResult2.errorMessage.contains(
781+
s"The following columns were not found in the first dataset: $nonExistCol1"))
782+
783+
// Non-key columns not in first dataset (Row level)
784+
val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap2))
785+
assert(rowLevelResult2.isLeft)
786+
val failedRowLevelResult2 = rowLevelResult2.left.get
787+
assert(failedRowLevelResult2.errorMessage.contains(
788+
s"The following columns were not found in the first dataset: $nonExistCol1"))
789+
790+
// Non-key column not in second dataset
791+
val compColsMap3 = Map("state" -> nonExistCol2)
792+
val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion)
793+
794+
assert(overallResult3.isInstanceOf[ComparisonFailed])
795+
val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed]
796+
assert(failedOverallResult3.errorMessage.contains(
797+
s"The following columns were not found in the second dataset: $nonExistCol2"))
798+
799+
// Non-key column not in second dataset (Row level)
800+
val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap3))
801+
assert(rowLevelResult3.isLeft)
802+
val failedRowLevelResult3 = rowLevelResult3.left.get
803+
assert(failedOverallResult3.errorMessage.contains(
804+
s"The following columns were not found in the second dataset: $nonExistCol2"))
805+
}
689806
}
690807
}

0 commit comments

Comments
 (0)