Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#246: Implement basics of ParitioningReader #287

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ abstract class Dispatcher(config: Config) {
partitioning: PartitioningDTO,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO

}
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,4 @@ lazy val reader = (projectMatrix in file("reader"))
)
.addScalaCrossBuild(Setup.clientSupportedScalaVersions, Dependencies.readerDependencies)
.dependsOn(model)
// .dependsOn(agent)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package za.co.absa.atum.model.types

import za.co.absa.atum.model.dto.{AdditionalDataDTO, PartitionDTO, PartitioningDTO}

import scala.collection.immutable.ListMap

object BasicTypes {
type AtumPartitions = ListMap[String, String]
type AdditionalData = Map[String, Option[String]]

/**
* Object contains helper methods to work with Atum partitions.
*/
object AtumPartitions {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}

def apply(elems: List[(String, String)]): AtumPartitions = {
ListMap(elems: _*)
}

def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = {
atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq
}

def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = {
AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList)
}
}

object AdditionalData {
def transformAdditionalDataDTO(additionalDataDTO: AdditionalDataDTO): AdditionalData = {
additionalDataDTO.data.map{ case (k, v) => (k, v.flatMap(_.value)) }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package za.co.absa.atum.model.types

import za.co.absa.atum.model.dto.MeasurementDTO

import java.time.ZonedDateTime

case class Checkpoint (
id: String,
name: String,
author: String,
measuredByAtumAgent: Boolean = false,
processStartTime: ZonedDateTime,
processEndTime: Option[ZonedDateTime],
measurements: Set[MeasurementDTO]
)
60 changes: 60 additions & 0 deletions reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package za.co.absa.atum.reader

import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO}
import za.co.absa.atum.model.types.BasicTypes.AtumPartitions

import java.time.ZonedDateTime
import java.util.UUID

class Dispatcher {

/**
* This method is used to get the Additional data from the server.
* Mock method to return AdditionalDataDTO
* @param partitioning : Partitioning to obtain ID for.
* @return AdditionalDataDTO.
*/
def getAdditionalData(partitioning: AtumPartitions): AdditionalDataDTO = {
AdditionalDataDTO(
data = Map(
"key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author1")),
"key2" -> None
)
)
}

/**
* This method is used to get the Checkpoints from the server.
*
* @param partitioning : Partitioning to obtain checkpoints for.
* @param limit : Limit of checkpoints to return.
* @param offset : Offset of checkpoints to return.
* @param checkpointName : Name of the checkpoint to return.
* @return List of CheckpointV2DTO.
*/
def getCheckpoints(
partitioning: AtumPartitions,
limit: Option[Int],
offset: Option[Long],
checkpointName: Option[String]): Seq[CheckpointV2DTO] = {
Seq(
CheckpointV2DTO(
id = UUID.randomUUID(),
name = "checkpoint1",
author = "author1",
measuredByAtumAgent = true,
processStartTime = ZonedDateTime.now(),
processEndTime = Some(ZonedDateTime.now().plusHours(1)),
measurements = Set.empty
),
CheckpointV2DTO(
id = UUID.randomUUID(),
name = "checkpoint2",
author = "author2",
processStartTime = ZonedDateTime.now().minusDays(1),
processEndTime = None,
measurements = Set.empty
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,98 @@

package za.co.absa.atum.reader

class PartitioningReader {
def foo(): String = {
// just to have some testable content
"bar"
import cats.Monad
import cats.implicits.{toFlatMapOps, toFunctorOps}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, PartitioningWithIdDTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.BasicTypes.AdditionalData.transformAdditionalDataDTO
import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions}
import za.co.absa.atum.model.types.Checkpoint
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
import za.co.absa.atum.reader.server.GenericServerConnection

//import scala.language.higherKinds
//
//class PartitioningReader[F[_]: Monad](partitioning: AtumPartitions)(
// implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) {
//
// /**
// * Fetches additional data for the given partitioning.
// * @param partitioning The partitioning for which to fetch additional data.
// * @return AdditionalData containing the additional data.
// */
// def getAdditionalData: F[AdditionalData] = {
// Monad[F].pure(dispatcher.getAdditionalData(partitioning).data.map {
// case (key, Some(itemDTO)) => key -> Some(itemDTO.value.get)
// case (key, None) => key -> None
// })
// }
//
// /**
// * Fetches checkpoints for the given partitioning.
// * @param partitioning The partitioning for which to fetch checkpoints.
// * @return List of CheckpointDTO containing the checkpoints.
// */
// def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = {
// Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto =>
// Checkpoint(
// id = dto.id.toString,
// name = dto.name,
// author = dto.author,
// measuredByAtumAgent = dto.measuredByAtumAgent,
// processStartTime = dto.processStartTime,
// processEndTime = dto.processEndTime,
// measurements = dto.measurements
// )
// }.toList)
// }
//
//}
//
//object PartitioningReader {
// def apply[F[_]: Monad](partitioning: AtumPartitions)(
// implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher): PartitioningReader[F] =
// new PartitioningReader[F](partitioning)
//}


class PartitioningReader[F[_]: Monad](atumPartitions: AtumPartitions)(
implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) {

def getAdditionalData: F[AdditionalData] = {
val partitioningDTO = AtumPartitions.toSeqPartitionDTO(atumPartitions)
val encodedPartitioning = partitioningDTO.asBase64EncodedJsonString

for {
partitioningIdEffect <- serverConnection.query[SingleSuccessResponse[PartitioningWithIdDTO]](
s"/api/v2/partitionings/?partitioning=${encodedPartitioning}")

partitioningId = partitioningIdEffect.data.id

additionalDataEndpoint = s"/api/v2/partitionings/${partitioningId}/additional-data"
additionalDataEffect <- serverConnection.query[SingleSuccessResponse[AdditionalDataDTO]](additionalDataEndpoint)

additionalData = transformAdditionalDataDTO(additionalDataEffect.data)
} yield additionalData
}

def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = {
Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto =>
Checkpoint(
id = dto.id.toString,
name = dto.name,
author = dto.author,
measuredByAtumAgent = dto.measuredByAtumAgent,
processStartTime = dto.processStartTime,
processEndTime = dto.processEndTime,
measurements = dto.measurements
)
}.toList)
}
}

object PartitioningReader {
def apply[F[_]: Monad](partitioning: AtumPartitions)(
implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher): PartitioningReader[F] =
new PartitioningReader[F](partitioning)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package za.co.absa.atum.reader.server

import cats.Monad
import io.circe.Decoder

abstract class GenericServerConnection[F[_] : Monad](val serverUrl: String) {
def query[R: Decoder](endpointUri: String): F[R]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,90 @@

package za.co.absa.atum.reader

import cats.Id
import org.mockito.MockitoSugar
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO}
import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions}
import za.co.absa.atum.model.types.Checkpoint

class PartitioningReaderUnitTests extends AnyFunSuiteLike {
test("foo") {
val expected = new PartitioningReader().foo()
assert(expected == "bar")
import java.time.ZonedDateTime
import java.util.UUID
import cats.Monad
import scala.collection.immutable.ListMap
//import za.co.absa.atum.reader.server.GenericServerConnection.Dispatcher

class PartitioningReaderUnitTests extends AnyFunSuiteLike with Matchers with MockitoSugar {

private implicit val idMonad: Monad[Id] = Monad[Id]

trait TestContext {
val partitioning: AtumPartitions = ListMap("key1" -> "value1", "key2" -> "value2")
implicit val dispatcher: Dispatcher = mock[Dispatcher]
val reader: PartitioningReader[Id] = new PartitioningReader[Id](partitioning)
}

protected val additionalDataDTO1: AdditionalDataDTO = AdditionalDataDTO(
Map(
"key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author")),
"key2" -> None,
"key3" -> Some(AdditionalDataItemDTO(Some("value3"), "author"))
))

test("getAdditionalData should fetch and transform additional data correctly") {
new TestContext {
when(dispatcher.getAdditionalData(partitioning)).thenReturn(additionalDataDTO1)

val result: Id[AdditionalData] = reader.getAdditionalData

Map(
"key1" -> Some("value1"),
"key2" -> None,
"key3" -> Some("value3")
) shouldEqual result
}
}

test("getCheckpoints should fetch and transform checkpoints correctly") {
new TestContext {
val checkpointsDTO: Seq[CheckpointV2DTO] = Seq(
CheckpointV2DTO(
id = UUID.randomUUID(),
name = "checkpoint1",
author = "author1",
measuredByAtumAgent = true,
processStartTime = ZonedDateTime.now(),
processEndTime = Some(ZonedDateTime.now().plusHours(1)),
measurements = Set.empty
),
CheckpointV2DTO(
id = UUID.randomUUID(),
name = "checkpoint2",
author = "author2",
measuredByAtumAgent = false,
processStartTime = ZonedDateTime.now().minusDays(1),
processEndTime = None,
measurements = Set.empty
)
)

when(dispatcher.getCheckpoints(partitioning, Some(10), Some(0L), Some("checkpoint1"))).thenReturn(checkpointsDTO)

val result: Id[List[Checkpoint]] = reader.getCheckpoints(Some(10), Some(0L), Some("checkpoint1"))

result shouldEqual checkpointsDTO.map { dto =>
Checkpoint(
id = dto.id.toString,
name = dto.name,
author = dto.author,
measuredByAtumAgent = dto.measuredByAtumAgent,
processStartTime = dto.processStartTime,
processEndTime = dto.processEndTime,
measurements = dto.measurements
)
}.toList
}
}

}
Loading