Skip to content

Commit

Permalink
#200: Use Circe as serde library for JSON in Agent, Server and Model …
Browse files Browse the repository at this point in the history
…modules

* addid circe dependency and using it
* using circe syntax in flow classes
* removing play Json dependencies, and PlayJsonImplicits class
* revised arrayPutUsing string implicit conversion
* removed json4s and replaced its usage
* defining implicits in the companion objects of the given DTOs
* removing jacksonModuleScala
* Redefining ResultValueType custom types

---------

Co-authored-by: Pavel Salamon <salamonpavel@gmail.com>
Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 11, 2024
1 parent dcefb8c commit 663e2a3
Show file tree
Hide file tree
Showing 55 changed files with 451 additions and 644 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.utils.SerializationUtils
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._

class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Logging {
import HttpDispatcher._
Expand All @@ -47,19 +47,17 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
val request = commonAtumRequest
.post(createPartitioningEndpoint)
.body(SerializationUtils.asJson(partitioning))
.body(partitioning.asJsonString)

val response = backend.send(request)

SerializationUtils.fromJson[AtumContextDTO](
handleResponseBody(response)
)
handleResponseBody(response).as[AtumContextDTO]
}

override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
val request = commonAtumRequest
.post(createCheckpointEndpoint)
.body(SerializationUtils.asJson(checkpoint))
.body(checkpoint.asJsonString)

val response = backend.send(request)

Expand All @@ -69,7 +67,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
override protected[agent] def saveAdditionalData(additionalDataSubmitDTO: AdditionalDataSubmitDTO): Unit = {
val request = commonAtumRequest
.post(createAdditionalDataEndpoint)
.body(SerializationUtils.asJson(additionalDataSubmitDTO))
.body(additionalDataSubmitDTO.asJsonString)

val response = backend.send(request)

Expand Down
12 changes: 6 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.types.{DataType, DecimalType, LongType, StringType}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.atum.agent.core.MeasurementProcessor
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.model.ResultValueType

/**
* Type of different measures to be applied to the columns.
Expand Down Expand Up @@ -57,7 +57,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq.empty
override val resultValueType: ResultValueType = ResultValueType.Long
override val resultValueType: ResultValueType = ResultValueType.LongValue
}
object RecordCount {
private[agent] val measureName: String = "count"
Expand All @@ -76,7 +76,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = measuredCols
override val resultValueType: ResultValueType = ResultValueType.Long
override val resultValueType: ResultValueType = ResultValueType.LongValue
}
object DistinctRecordCount {
private[agent] val measureName: String = "distinctCount"
Expand All @@ -93,7 +93,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType = ResultValueType.BigDecimal
override val resultValueType: ResultValueType = ResultValueType.BigDecimalValue
}
object SumOfValuesOfColumn {
private[agent] val measureName: String = "aggregatedTotal"
Expand All @@ -110,7 +110,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType = ResultValueType.BigDecimal
override val resultValueType: ResultValueType = ResultValueType.BigDecimalValue
}
object AbsSumOfValuesOfColumn {
private[agent] val measureName: String = "absAggregatedTotal"
Expand All @@ -125,7 +125,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType = ResultValueType.String
override val resultValueType: ResultValueType = ResultValueType.StringValue
}
object SumOfHashesOfColumn {
private[agent] val measureName: String = "hashCrc32"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.model.ResultValueType

/**
* This trait defines a contract for a measure result.
Expand Down Expand Up @@ -80,13 +80,13 @@ object MeasureResult {
resultValue match {

case l: Long =>
MeasureResultProvided[Long](l, ResultValueType.Long)
MeasureResultProvided[Long](l, ResultValueType.LongValue)
case d: Double =>
MeasureResultProvided[Double](d, ResultValueType.Double)
MeasureResultProvided[Double](d, ResultValueType.DoubleValue)
case bd: BigDecimal =>
MeasureResultProvided[BigDecimal](bd, ResultValueType.BigDecimal)
MeasureResultProvided[BigDecimal](bd, ResultValueType.BigDecimalValue)
case s: String =>
MeasureResultProvided[String](s, ResultValueType.String)
MeasureResultProvided[String](s, ResultValueType.StringValue)

case unsupportedType =>
val className = unsupportedType.getClass.getSimpleName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue
import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}

/**
Expand All @@ -41,7 +42,7 @@ private [agent] object MeasurementBuilder {
val measureDTO = MeasureDTO(measure.measureName, measure.measuredColumns)

val measureResultDTO = MeasureResultDTO(
MeasureResultDTO.TypedValue(measureResult.resultValue.toString, measureResult.resultValueType)
TypedValue(measureResult.resultValue.toString, measureResult.resultValueType)
)
MeasurementDTO(measureDTO, measureResultDTO)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

class AtumContextUnitTests extends AnyFlatSpec with Matchers {

Expand Down Expand Up @@ -100,7 +100,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
assert(argument.getValue.author == authorTest)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.measurements.head.result.mainValue.value == "3")
assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long)
assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
}

"createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in {
Expand All @@ -115,7 +115,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
val measurements: Map[Measure, MeasureResult] = Map(
RecordCount("col") -> MeasureResult(1L),
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1)),
UnknownMeasure("customMeasureName", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1))
UnknownMeasure("customMeasureName", Seq("col"), ResultValueType.BigDecimalValue) -> MeasureResult(BigDecimal(1))
)

atumContext.createCheckpointOnProvidedData(
Expand Down Expand Up @@ -172,7 +172,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
assert(argumentFirst.getValue.author == authorTest)
assert(argumentFirst.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4")
assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long)
assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)

atumContext.addMeasure(SumOfValuesOfColumn("columnForSum"))
when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user
Expand All @@ -185,7 +185,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
assert(argumentSecond.getValue.author == authorTest + "Another")
assert(argumentSecond.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5")
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimal)
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue)
}

"addAdditionalData" should "add key/value pair to map for additional data" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.model.ResultValueType
import za.co.absa.spark.commons.test.SparkTestBase

class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>
Expand Down Expand Up @@ -94,17 +94,17 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase

// Assertions
assert(dfPersonCntResult.resultValue == "1000")
assert(dfPersonCntResult.resultValueType == ResultValueType.Long)
assert(dfPersonCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullCntResult.resultValue == "1000")
assert(dfFullCntResult.resultValueType == ResultValueType.Long)
assert(dfFullCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullSalaryAbsSumResult.resultValue == "2987144")
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullHashResult.resultValue == "2044144307532")
assert(dfFullHashResult.resultValueType == ResultValueType.String)
assert(dfFullHashResult.resultValueType == ResultValueType.StringValue)
assert(dfExtraPersonSalarySumResult.resultValue == "2986144")
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullSalarySumResult.resultValue == "2987144")
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
}

"AbsSumOfValuesOfColumn" should "return expected value" in {
Expand All @@ -119,7 +119,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = salaryAbsSum.function(df)

assert(result.resultValue == "300.3")
assert(result.resultValueType == ResultValueType.BigDecimal)
assert(result.resultValueType == ResultValueType.BigDecimalValue)
}

"AbsSumOfValuesOfColumn" should "return expected value for null result" in {
Expand All @@ -134,7 +134,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = salaryAbsSum.function(df)

assert(result.resultValue == "0")
assert(result.resultValueType == ResultValueType.BigDecimal)
assert(result.resultValueType == ResultValueType.BigDecimalValue)
}

"RecordCount" should "return expected value" in {
Expand All @@ -149,7 +149,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = distinctCount.function(df)

assert(result.resultValue == "4")
assert(result.resultValueType == ResultValueType.Long)
assert(result.resultValueType == ResultValueType.LongValue)
}

"DistinctRecordCount" should "return expected value for multiple columns" in {
Expand All @@ -164,7 +164,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = distinctCount.function(df)

assert(result.resultValue == "3")
assert(result.resultValueType == ResultValueType.Long)
assert(result.resultValueType == ResultValueType.LongValue)
}

"DistinctRecordCount" should "fail requirements when no control columns given" in {
Expand All @@ -183,7 +183,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = distinctCount.function(df)

assert(result.resultValue == "4")
assert(result.resultValueType == ResultValueType.BigDecimal)
assert(result.resultValueType == ResultValueType.BigDecimalValue)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{AbsSumOfValuesOfColumn, RecordCount, SumOfHashesOfColumn, SumOfValuesOfColumn}
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.atum.agent.AtumContext._
import za.co.absa.atum.model.ResultValueType

class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>

Expand Down Expand Up @@ -92,17 +92,17 @@ class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { se

// Assertions
assert(dfPersonCntResult.resultValue == "1000")
assert(dfPersonCntResult.resultValueType == ResultValueType.Long)
assert(dfPersonCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullCntResult.resultValue == "1000")
assert(dfFullCntResult.resultValueType == ResultValueType.Long)
assert(dfFullCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullSalaryAbsSumResult.resultValue == "2987144")
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullHashResult.resultValue == "2044144307532")
assert(dfFullHashResult.resultValueType == ResultValueType.String)
assert(dfFullHashResult.resultValueType == ResultValueType.StringValue)
assert(dfExtraPersonSalarySumResult.resultValue == "2986144")
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullSalarySumResult.resultValue == "2987144")
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue}
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue

class MeasurementBuilderUnitTests extends AnyFlatSpec {

Expand All @@ -35,7 +36,7 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col"))

val expectedMeasureResultDTO = MeasureResultDTO(
TypedValue("1", ResultValueType.BigDecimal)
TypedValue("1", ResultValueType.BigDecimalValue)
)

assert(measurementDTO.measure == expectedMeasureDTO)
Expand All @@ -49,7 +50,7 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
val measureResult = MeasureResult(BigDecimal(3.14))
val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measure, measureResult)

val expectedTypedValue = TypedValue("3.14", ResultValueType.BigDecimal)
val expectedTypedValue = TypedValue("3.14", ResultValueType.BigDecimalValue)

assert(measurementDTO.result.mainValue == expectedTypedValue)
}
Expand All @@ -59,11 +60,11 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
"when Measurement provided" in {

val measure = SumOfValuesOfColumn("col")
val measureResult = MeasureResult("stringValue", ResultValueType.BigDecimal)
val measureResult = MeasureResult("string", ResultValueType.BigDecimalValue)

val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measure, measureResult)

val expectedTypedValue = TypedValue("stringValue", ResultValueType.BigDecimal)
val expectedTypedValue = TypedValue("string", ResultValueType.BigDecimalValue)

assert(measurementDTO.result.mainValue == expectedTypedValue)
}
Expand All @@ -72,14 +73,14 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
"build MeasurementDTO for BigDecimal type of result value when measured by Agent" in {

val measure = SumOfValuesOfColumn("col")
val measureResult = MeasureResult("1", ResultValueType.BigDecimal)
val measureResult = MeasureResult("1", ResultValueType.BigDecimalValue)

val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measure, measureResult)

val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col"))

val expectedMeasureResultDTO = MeasureResultDTO(
TypedValue("1", ResultValueType.BigDecimal)
TypedValue("1", ResultValueType.BigDecimalValue)
)

assert(measurementDTO.measure == expectedMeasureDTO)
Expand All @@ -88,25 +89,25 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {

"buildAndValidateMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements: Map[Measure, MeasureResult] = Map(
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.Long),
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.LongValue),
SumOfValuesOfColumn("col1") -> MeasureResult(BigDecimal(1.2)),
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3)),
UnknownMeasure("unknownMeasure", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1.1))
UnknownMeasure("unknownMeasure", Seq("col"), ResultValueType.BigDecimalValue) -> MeasureResult(BigDecimal(1.1))
)
val measurementDTOs = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)

val expectedMeasurementDTO = Set(
MeasurementDTO(
MeasureDTO("distinctCount", Seq("col")), MeasureResultDTO(TypedValue("1", ResultValueType.Long))
MeasureDTO("distinctCount", Seq("col")), MeasureResultDTO(TypedValue("1", ResultValueType.LongValue))
),
MeasurementDTO(
MeasureDTO("aggregatedTotal", Seq("col1")), MeasureResultDTO(TypedValue("1.2", ResultValueType.BigDecimal))
MeasureDTO("aggregatedTotal", Seq("col1")), MeasureResultDTO(TypedValue("1.2", ResultValueType.BigDecimalValue))
),
MeasurementDTO(
MeasureDTO("aggregatedTotal", Seq("col2")), MeasureResultDTO(TypedValue("1.3", ResultValueType.BigDecimal))
MeasureDTO("aggregatedTotal", Seq("col2")), MeasureResultDTO(TypedValue("1.3", ResultValueType.BigDecimalValue))
),
MeasurementDTO(
MeasureDTO("unknownMeasure", Seq("col")), MeasureResultDTO(TypedValue("1.1", ResultValueType.BigDecimal))
MeasureDTO("unknownMeasure", Seq("col")), MeasureResultDTO(TypedValue("1.1", ResultValueType.BigDecimalValue))
)
)

Expand Down Expand Up @@ -149,7 +150,7 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
val measure = SumOfValuesOfColumn("col")

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("stringValue", ResultValueType.String)))
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("string", ResultValueType.StringValue)))
)
}
}
Loading

0 comments on commit 663e2a3

Please sign in to comment.