Skip to content

Commit

Permalink
#93: Create accessor for Control Measure and other models from the at… (
Browse files Browse the repository at this point in the history
#105)

* #93: Create accessor for Control Measure and other models from the atum context
* Add getControlMeasure method to AtumImplicits
  • Loading branch information
TerezaHrubesova authored Jul 30, 2021
1 parent 43651b4 commit ee6e25e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
10 changes: 10 additions & 0 deletions atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import za.co.absa.atum.core.{Atum, Constants}
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.persistence._
import za.co.absa.atum.persistence.hdfs.{ControlMeasuresHdfsLoaderJsonFile, ControlMeasuresHdfsStorerJsonFile}
import za.co.absa.atum.utils.{BuildProperties, DefaultBuildProperties, InfoFile}
Expand Down Expand Up @@ -260,6 +261,15 @@ trait AtumImplicitsBase {
dataset
}

/**
* The method returns ControlMeasure object from the Atum context
* @return - ControlMeasure object containing all the checkpoints up to the current point
*/
def getControlMeasure: ControlMeasure = {
atum.preventNotInitialized()
atum.controlFrameworkState.getControlMeasure
}

/**
* The method returns the number of records in the dataframe calculated during the last checkpoint.
* If record count is absent in the checkpoint measurements, None is returned.
Expand Down
18 changes: 17 additions & 1 deletion atum/src/test/scala/za/co/absa/atum/AtumImplicitsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class AtumImplicitsSpec extends AnyFlatSpec with SparkTestBase with Matchers {
val inputPath: String = TestResources.InputInfo.localPath
val outputPath = "/tmp/json-setAdditionalInfo-test"

val inputData = FileUtils.readFileToString(inputPath)
val inputData: String = FileUtils.readFileToString(inputPath)

val inputControlMeasure: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](inputData)

Expand Down Expand Up @@ -63,4 +63,20 @@ class AtumImplicitsSpec extends AnyFlatSpec with SparkTestBase with Matchers {
spark.disableControlMeasuresTracking()
}

"method getControlMeasure" should "return ControlMeasure object" in {
// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(Some(inputPath), None)
.setControlMeasuresWorkflow("getControlMeasure")

import spark.implicits._
val df = spark.read.json(Seq(inputData).toDS)

// act
val controlMeasure = df.getControlMeasure

// assert
controlMeasure shouldBe inputControlMeasure

spark.disableControlMeasuresTracking()
}
}

0 comments on commit ee6e25e

Please sign in to comment.