diff --git a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala index 3119bfb4..031953a0 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala @@ -31,21 +31,22 @@ import za.co.absa.atum.reader.server.ServerConfig /** * This class is a reader that reads data tight to a flow. - * @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning' - * @param serverConfig - the Atum server configuration - * @param backend - sttp backend, that will be executing the requests - * @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context - * bounds, as it make the imports easier to follow - * @tparam F - the effect type (e.g. Future, IO, Task, etc.) + * + * @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning' + * @param serverConfig - the Atum server configuration + * @param backend - sttp backend, that will be executing the requests + * @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context + * bounds, as it make the imports easier to follow + * @tparam F - the effect type (e.g. Future, IO, Task, etc.) */ -class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions) - (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) - extends Reader[F] with PartitioningIdProvider[F]{ +class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions) + (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any]) + extends Reader[F] with PartitioningIdProvider[F] { private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = { val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}" val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint) - queryResult.map{ result => + queryResult.map { result => result.map(_.data.id) } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala index f6f6deb3..3398c740 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala @@ -24,13 +24,17 @@ import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse import za.co.absa.atum.model.types.basic.AtumPartitions import za.co.absa.atum.model.types.basic.AtumPartitionsOps import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax -import RequestResult.RequestResult +import za.co.absa.atum.reader.core.RequestResult.RequestResult -trait PartitioningIdProvider[F[_]] {self: Reader[F] => +trait PartitioningIdProvider[F[_]] { + self: Reader[F] => def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = { val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString - val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](s"/$Api/$V2/${V2Paths.Partitionings}", Map("partitioning" -> encodedPartitioning)) - queryResult.map{ result => + val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]( + s"/$Api/$V2/${V2Paths.Partitionings}", + Map("partitioning" -> encodedPartitioning) + ) + queryResult.map { result => result.map(_.data.id) } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala index 4f713eb5..6aba5f3d 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala @@ -35,7 +35,8 @@ object RequestException { final case class ParsingException( message: String, body: String - ) extends RequestException(message) + ) extends RequestException(message) + object ParsingException { def fromCirceError(error: CirceError, body: String): ParsingException = { ParsingException(error.getMessage, body) diff --git a/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala index 560c6b07..8ae88699 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala @@ -31,7 +31,6 @@ import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import za.co.absa.atum.model.types.basic.{AtumPartitions, AtumPartitionsOps} import za.co.absa.atum.reader.FlowReaderUnitTests._ import za.co.absa.atum.reader.server.ServerConfig -import za.co.absa.atum.reader.implicits.future.futureMonadError import java.time.ZonedDateTime import java.util.UUID @@ -46,7 +45,7 @@ class FlowReaderUnitTests extends AnyFunSuiteLike { "a" -> "b", "c" -> "d" )) - implicit val server: SttpBackend[Future, Any] = SttpBackendStub.asynchronousFuture + implicit val server: SttpBackend[Identity, Any] = SttpBackendStub.synchronous val result = new FlowReader(atumPartitions).mainFlowPartitioning assert(result == atumPartitions)