diff --git a/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala b/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala index f14fc5f5..e3a40522 100644 --- a/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala +++ b/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import za.co.absa.atum.core.{Atum, Constants} +import za.co.absa.atum.model.ControlMeasure import za.co.absa.atum.persistence._ import za.co.absa.atum.persistence.hdfs.{ControlMeasuresHdfsLoaderJsonFile, ControlMeasuresHdfsStorerJsonFile} import za.co.absa.atum.utils.{BuildProperties, DefaultBuildProperties, InfoFile} @@ -260,6 +261,15 @@ trait AtumImplicitsBase { dataset } + /** + * The method returns ControlMeasure object from the Atum context + * @return - ControlMeasure object containing all the checkpoints up to the current point + */ + def getControlMeasure: ControlMeasure = { + atum.preventNotInitialized() + atum.controlFrameworkState.getControlMeasure + } + /** * The method returns the number of records in the dataframe calculated during the last checkpoint. * If record count is absent in the checkpoint measurements, None is returned. diff --git a/atum/src/test/scala/za/co/absa/atum/AtumImplicitsSpec.scala b/atum/src/test/scala/za/co/absa/atum/AtumImplicitsSpec.scala index 763d6dc9..614cd034 100644 --- a/atum/src/test/scala/za/co/absa/atum/AtumImplicitsSpec.scala +++ b/atum/src/test/scala/za/co/absa/atum/AtumImplicitsSpec.scala @@ -30,7 +30,7 @@ class AtumImplicitsSpec extends AnyFlatSpec with SparkTestBase with Matchers { val inputPath: String = TestResources.InputInfo.localPath val outputPath = "/tmp/json-setAdditionalInfo-test" - val inputData = FileUtils.readFileToString(inputPath) + val inputData: String = FileUtils.readFileToString(inputPath) val inputControlMeasure: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](inputData) @@ -63,4 +63,20 @@ class AtumImplicitsSpec extends AnyFlatSpec with SparkTestBase with Matchers { spark.disableControlMeasuresTracking() } + "method getControlMeasure" should "return ControlMeasure object" in { + // Initializing library to hook up to Apache Spark + spark.enableControlMeasuresTracking(Some(inputPath), None) + .setControlMeasuresWorkflow("getControlMeasure") + + import spark.implicits._ + val df = spark.read.json(Seq(inputData).toDS) + + // act + val controlMeasure = df.getControlMeasure + + // assert + controlMeasure shouldBe inputControlMeasure + + spark.disableControlMeasuresTracking() + } }