Skip to content

Commit

Permalink
Merge branch 'release/4.0.0' into feature/99-examples-standalone-in-repo
Browse files Browse the repository at this point in the history
  • Loading branch information
dk1844 authored Aug 2, 2021
2 parents fda91e8 + ee6e25e commit 3db0c56
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
10 changes: 10 additions & 0 deletions atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import za.co.absa.atum.core.{Atum, Constants}
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.persistence._
import za.co.absa.atum.persistence.hdfs.{ControlMeasuresHdfsLoaderJsonFile, ControlMeasuresHdfsStorerJsonFile}
import za.co.absa.atum.utils.{BuildProperties, DefaultBuildProperties, InfoFile}
Expand Down Expand Up @@ -260,6 +261,15 @@ trait AtumImplicitsBase {
dataset
}

/**
* The method returns ControlMeasure object from the Atum context
* @return - ControlMeasure object containing all the checkpoints up to the current point
*/
def getControlMeasure: ControlMeasure = {
atum.preventNotInitialized()
atum.controlFrameworkState.getControlMeasure
}

/**
* The method returns the number of records in the dataframe calculated during the last checkpoint.
* If record count is absent in the checkpoint measurements, None is returned.
Expand Down
18 changes: 17 additions & 1 deletion atum/src/test/scala/za/co/absa/atum/AtumImplicitsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class AtumImplicitsSpec extends AnyFlatSpec with SparkTestBase with Matchers {
val inputPath: String = TestResources.InputInfo.localPath
val outputPath = "/tmp/json-setAdditionalInfo-test"

val inputData = FileUtils.readFileToString(inputPath)
val inputData: String = FileUtils.readFileToString(inputPath)

val inputControlMeasure: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](inputData)

Expand Down Expand Up @@ -63,4 +63,20 @@ class AtumImplicitsSpec extends AnyFlatSpec with SparkTestBase with Matchers {
spark.disableControlMeasuresTracking()
}

"method getControlMeasure" should "return ControlMeasure object" in {
// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(Some(inputPath), None)
.setControlMeasuresWorkflow("getControlMeasure")

import spark.implicits._
val df = spark.read.json(Seq(inputData).toDS)

// act
val controlMeasure = df.getControlMeasure

// assert
controlMeasure shouldBe inputControlMeasure

spark.disableControlMeasuresTracking()
}
}

0 comments on commit 3db0c56

Please sign in to comment.