diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala index 0580f844e..7b3a929ad 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala @@ -56,4 +56,5 @@ abstract class Dispatcher(config: Config) { partitioning: PartitioningDTO, additionalDataPatchDTO: AdditionalDataPatchDTO ): AdditionalDataDTO + } diff --git a/build.sbt b/build.sbt index 0c2f6b1ee..2a638abd1 100644 --- a/build.sbt +++ b/build.sbt @@ -121,3 +121,4 @@ lazy val reader = (projectMatrix in file("reader")) ) .addScalaCrossBuild(Setup.clientSupportedScalaVersions, Dependencies.readerDependencies) .dependsOn(model) +// .dependsOn(agent) diff --git a/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala b/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala new file mode 100644 index 000000000..2fdf9cfa8 --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala @@ -0,0 +1,37 @@ +package za.co.absa.atum.model.types + +import za.co.absa.atum.model.dto.{AdditionalDataDTO, PartitionDTO, PartitioningDTO} + +import scala.collection.immutable.ListMap + +object BasicTypes { + type AtumPartitions = ListMap[String, String] + type AdditionalData = Map[String, Option[String]] + + /** + * Object contains helper methods to work with Atum partitions. + */ + object AtumPartitions { + def apply(elems: (String, String)): AtumPartitions = { + ListMap(elems) + } + + def apply(elems: List[(String, String)]): AtumPartitions = { + ListMap(elems: _*) + } + + def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = { + atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq + } + + def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = { + AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList) + } + } + + object AdditionalData { + def transformAdditionalDataDTO(additionalDataDTO: AdditionalDataDTO): AdditionalData = { + additionalDataDTO.data.map{ case (k, v) => (k, v.flatMap(_.value)) } + } + } +} diff --git a/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala b/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala new file mode 100644 index 000000000..1de914810 --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala @@ -0,0 +1,15 @@ +package za.co.absa.atum.model.types + +import za.co.absa.atum.model.dto.MeasurementDTO + +import java.time.ZonedDateTime + +case class Checkpoint ( + id: String, + name: String, + author: String, + measuredByAtumAgent: Boolean = false, + processStartTime: ZonedDateTime, + processEndTime: Option[ZonedDateTime], + measurements: Set[MeasurementDTO] +) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala b/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala new file mode 100644 index 000000000..a5206ed19 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala @@ -0,0 +1,60 @@ +package za.co.absa.atum.reader + +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} +import za.co.absa.atum.model.types.BasicTypes.AtumPartitions + +import java.time.ZonedDateTime +import java.util.UUID + +class Dispatcher { + + /** + * This method is used to get the Additional data from the server. + * Mock method to return AdditionalDataDTO + * @param partitioning : Partitioning to obtain ID for. + * @return AdditionalDataDTO. + */ + def getAdditionalData(partitioning: AtumPartitions): AdditionalDataDTO = { + AdditionalDataDTO( + data = Map( + "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author1")), + "key2" -> None + ) + ) + } + + /** + * This method is used to get the Checkpoints from the server. + * + * @param partitioning : Partitioning to obtain checkpoints for. + * @param limit : Limit of checkpoints to return. + * @param offset : Offset of checkpoints to return. + * @param checkpointName : Name of the checkpoint to return. + * @return List of CheckpointV2DTO. + */ + def getCheckpoints( + partitioning: AtumPartitions, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String]): Seq[CheckpointV2DTO] = { + Seq( + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint1", + author = "author1", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.now(), + processEndTime = Some(ZonedDateTime.now().plusHours(1)), + measurements = Set.empty + ), + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint2", + author = "author2", + processStartTime = ZonedDateTime.now().minusDays(1), + processEndTime = None, + measurements = Set.empty + ) + ) + } +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index d1153e4b5..7a0bcae56 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -16,9 +16,98 @@ package za.co.absa.atum.reader -class PartitioningReader { - def foo(): String = { - // just to have some testable content - "bar" +import cats.Monad +import cats.implicits.{toFlatMapOps, toFunctorOps} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.model.types.BasicTypes.AdditionalData.transformAdditionalDataDTO +import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} +import za.co.absa.atum.model.types.Checkpoint +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax +import za.co.absa.atum.reader.server.GenericServerConnection + +//import scala.language.higherKinds +// +//class PartitioningReader[F[_]: Monad](partitioning: AtumPartitions)( +// implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) { +// +// /** +// * Fetches additional data for the given partitioning. +// * @param partitioning The partitioning for which to fetch additional data. +// * @return AdditionalData containing the additional data. +// */ +// def getAdditionalData: F[AdditionalData] = { +// Monad[F].pure(dispatcher.getAdditionalData(partitioning).data.map { +// case (key, Some(itemDTO)) => key -> Some(itemDTO.value.get) +// case (key, None) => key -> None +// }) +// } +// +// /** +// * Fetches checkpoints for the given partitioning. +// * @param partitioning The partitioning for which to fetch checkpoints. +// * @return List of CheckpointDTO containing the checkpoints. +// */ +// def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = { +// Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto => +// Checkpoint( +// id = dto.id.toString, +// name = dto.name, +// author = dto.author, +// measuredByAtumAgent = dto.measuredByAtumAgent, +// processStartTime = dto.processStartTime, +// processEndTime = dto.processEndTime, +// measurements = dto.measurements +// ) +// }.toList) +// } +// +//} +// +//object PartitioningReader { +// def apply[F[_]: Monad](partitioning: AtumPartitions)( +// implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher): PartitioningReader[F] = +// new PartitioningReader[F](partitioning) +//} + + +class PartitioningReader[F[_]: Monad](atumPartitions: AtumPartitions)( + implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) { + + def getAdditionalData: F[AdditionalData] = { + val partitioningDTO = AtumPartitions.toSeqPartitionDTO(atumPartitions) + val encodedPartitioning = partitioningDTO.asBase64EncodedJsonString + + for { + partitioningIdEffect <- serverConnection.query[SingleSuccessResponse[PartitioningWithIdDTO]]( + s"/api/v2/partitionings/?partitioning=${encodedPartitioning}") + + partitioningId = partitioningIdEffect.data.id + + additionalDataEndpoint = s"/api/v2/partitionings/${partitioningId}/additional-data" + additionalDataEffect <- serverConnection.query[SingleSuccessResponse[AdditionalDataDTO]](additionalDataEndpoint) + + additionalData = transformAdditionalDataDTO(additionalDataEffect.data) + } yield additionalData + } + + def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = { + Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto => + Checkpoint( + id = dto.id.toString, + name = dto.name, + author = dto.author, + measuredByAtumAgent = dto.measuredByAtumAgent, + processStartTime = dto.processStartTime, + processEndTime = dto.processEndTime, + measurements = dto.measurements + ) + }.toList) } } + +object PartitioningReader { + def apply[F[_]: Monad](partitioning: AtumPartitions)( + implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher): PartitioningReader[F] = + new PartitioningReader[F](partitioning) +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala b/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala new file mode 100644 index 000000000..35425253b --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala @@ -0,0 +1,8 @@ +package za.co.absa.atum.reader.server + +import cats.Monad +import io.circe.Decoder + +abstract class GenericServerConnection[F[_] : Monad](val serverUrl: String) { + def query[R: Decoder](endpointUri: String): F[R] +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala index c4221d8c5..a4f47c6c4 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala @@ -16,11 +16,90 @@ package za.co.absa.atum.reader +import cats.Id +import org.mockito.MockitoSugar import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} +import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} +import za.co.absa.atum.model.types.Checkpoint -class PartitioningReaderUnitTests extends AnyFunSuiteLike { - test("foo") { - val expected = new PartitioningReader().foo() - assert(expected == "bar") +import java.time.ZonedDateTime +import java.util.UUID +import cats.Monad +import scala.collection.immutable.ListMap +//import za.co.absa.atum.reader.server.GenericServerConnection.Dispatcher + +class PartitioningReaderUnitTests extends AnyFunSuiteLike with Matchers with MockitoSugar { + + private implicit val idMonad: Monad[Id] = Monad[Id] + + trait TestContext { + val partitioning: AtumPartitions = ListMap("key1" -> "value1", "key2" -> "value2") + implicit val dispatcher: Dispatcher = mock[Dispatcher] + val reader: PartitioningReader[Id] = new PartitioningReader[Id](partitioning) + } + + protected val additionalDataDTO1: AdditionalDataDTO = AdditionalDataDTO( + Map( + "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author")), + "key2" -> None, + "key3" -> Some(AdditionalDataItemDTO(Some("value3"), "author")) + )) + + test("getAdditionalData should fetch and transform additional data correctly") { + new TestContext { + when(dispatcher.getAdditionalData(partitioning)).thenReturn(additionalDataDTO1) + + val result: Id[AdditionalData] = reader.getAdditionalData + + Map( + "key1" -> Some("value1"), + "key2" -> None, + "key3" -> Some("value3") + ) shouldEqual result + } } + + test("getCheckpoints should fetch and transform checkpoints correctly") { + new TestContext { + val checkpointsDTO: Seq[CheckpointV2DTO] = Seq( + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint1", + author = "author1", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.now(), + processEndTime = Some(ZonedDateTime.now().plusHours(1)), + measurements = Set.empty + ), + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint2", + author = "author2", + measuredByAtumAgent = false, + processStartTime = ZonedDateTime.now().minusDays(1), + processEndTime = None, + measurements = Set.empty + ) + ) + + when(dispatcher.getCheckpoints(partitioning, Some(10), Some(0L), Some("checkpoint1"))).thenReturn(checkpointsDTO) + + val result: Id[List[Checkpoint]] = reader.getCheckpoints(Some(10), Some(0L), Some("checkpoint1")) + + result shouldEqual checkpointsDTO.map { dto => + Checkpoint( + id = dto.id.toString, + name = dto.name, + author = dto.author, + measuredByAtumAgent = dto.measuredByAtumAgent, + processStartTime = dto.processStartTime, + processEndTime = dto.processEndTime, + measurements = dto.measurements + ) + }.toList + } + } + }