Skip to content

Commit

Permalink
* addressed more PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki committed Feb 21, 2025
1 parent f4ce4d3 commit 56cfa09
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 17 deletions.
21 changes: 11 additions & 10 deletions reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ import za.co.absa.atum.reader.server.ServerConfig

/**
* This class is a reader that reads data tight to a flow.
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
* @param serverConfig - the Atum server configuration
* @param backend - sttp backend, that will be executing the requests
* @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context
* bounds, as it make the imports easier to follow
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
*
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
* @param serverConfig - the Atum server configuration
* @param backend - sttp backend, that will be executing the requests
* @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context
* bounds, as it make the imports easier to follow
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
*/
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
extends Reader[F] with PartitioningIdProvider[F]{
class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
extends Reader[F] with PartitioningIdProvider[F] {

private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}"
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint)
queryResult.map{ result =>
queryResult.map { result =>
result.map(_.data.id)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
import RequestResult.RequestResult
import za.co.absa.atum.reader.core.RequestResult.RequestResult

trait PartitioningIdProvider[F[_]] {self: Reader[F] =>
trait PartitioningIdProvider[F[_]] {
self: Reader[F] =>
def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](s"/$Api/$V2/${V2Paths.Partitionings}", Map("partitioning" -> encodedPartitioning))
queryResult.map{ result =>
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](
s"/$Api/$V2/${V2Paths.Partitionings}",
Map("partitioning" -> encodedPartitioning)
)
queryResult.map { result =>
result.map(_.data.id)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object RequestException {
final case class ParsingException(
message: String,
body: String
) extends RequestException(message)
) extends RequestException(message)

object ParsingException {
def fromCirceError(error: CirceError, body: String): ParsingException = {
ParsingException(error.getMessage, body)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse
import za.co.absa.atum.model.types.basic.{AtumPartitions, AtumPartitionsOps}
import za.co.absa.atum.reader.FlowReaderUnitTests._
import za.co.absa.atum.reader.server.ServerConfig
import za.co.absa.atum.reader.implicits.future.futureMonadError

import java.time.ZonedDateTime
import java.util.UUID
Expand All @@ -46,7 +45,7 @@ class FlowReaderUnitTests extends AnyFunSuiteLike {
"a" -> "b",
"c" -> "d"
))
implicit val server: SttpBackend[Future, Any] = SttpBackendStub.asynchronousFuture
implicit val server: SttpBackend[Identity, Any] = SttpBackendStub.synchronous

val result = new FlowReader(atumPartitions).mainFlowPartitioning
assert(result == atumPartitions)
Expand Down

0 comments on commit 56cfa09

Please sign in to comment.