From 2e0631a7c02c244cc611dbcc8950ccf1e42289da Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Tue, 25 Jun 2024 13:25:26 +0200 Subject: [PATCH] #118: adding support for MultipleResultFunction with status for Doobie (#120) * #118: adding support for MultipleResultFunction with status for Doobie --- README.md | 43 ++++-- .../main/scala/za/co/absa/fadb/DBEngine.scala | 41 ++--- .../scala/za/co/absa/fadb/DBFunction.scala | 143 ++++++++++++++++-- .../main/scala/za/co/absa/fadb/Query.scala | 21 ++- .../co/absa/fadb/status/FunctionStatus.scala | 22 --- .../status/aggregation/StatusAggregator.scala | 53 +++++++ .../ByFirstErrorStatusAggregator.scala | 39 +++++ .../ByFirstRowStatusAggregator.scala | 42 +++++ .../ByMajorityErrorsStatusAggregator.scala | 57 +++++++ .../fadb/status/handling/StatusHandling.scala | 8 +- .../StandardStatusHandling.scala | 6 +- .../UserDefinedStatusHandling.scala | 8 +- .../za/co/absa/fadb/status/package.scala | 50 ++++++ .../za/co/absa/fadb/DBFunctionUnitTests.scala | 4 +- .../status/StatusExceptionUnitTests.scala | 2 +- .../StatusAggregatorUnitTests.scala | 76 ++++++++++ ...yFirstErrorStatusAggregatorUnitTests.scala | 96 ++++++++++++ .../ByFirstRowStatusAggregatorUnitTests.scala | 102 +++++++++++++ ...orityErrorsStatusAggregatorUnitTests.scala | 133 ++++++++++++++++ .../StandardStatusHandlingUnitTests.scala | 21 ++- .../integration/V1.2.99__insert_test_data.sql | 4 +- .../za/co/absa/fadb/doobie/DoobieEngine.scala | 23 +-- .../co/absa/fadb/doobie/DoobieFunction.scala | 85 ++++++++--- .../za/co/absa/fadb/doobie/DoobieQuery.scala | 15 +- .../co/absa/fadb/doobie/StatusWithData.scala | 7 +- ...unctionWithAggStatusIntegrationTests.scala | 100 ++++++++++++ ...ltFunctionWithStatusIntegrationTests.scala | 83 ++++++++++ ...ltFunctionWithStatusIntegrationTests.scala | 2 +- .../za/co/absa/fadb/doobie/DoobieTest.scala | 1 + .../za/co/absa/fadb/slick/SlickFunction.scala | 62 ++++++-- .../za/co/absa/fadb/slick/SlickPgEngine.scala | 6 +- .../za/co/absa/fadb/slick/SlickQuery.scala | 29 ++-- .../slick/OptionalActorSlickConverter.scala | 20 ++- ...ltipleResultFunctionIntegrationTests.scala | 9 +- ...unctionWithAggStatusIntegrationTests.scala | 64 ++++++++ ...ltFunctionWithStatusIntegrationTests.scala | 56 +++++++ .../za/co/absa/fadb/slick/SlickTest.scala | 4 +- 37 files changed, 1360 insertions(+), 177 deletions(-) delete mode 100644 core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala create mode 100644 core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregator.scala create mode 100644 core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregator.scala create mode 100644 core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregator.scala create mode 100644 core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregator.scala create mode 100644 core/src/main/scala/za/co/absa/fadb/status/package.scala create mode 100644 core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregatorUnitTests.scala create mode 100644 core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregatorUnitTests.scala create mode 100644 core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregatorUnitTests.scala create mode 100644 core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregatorUnitTests.scala create mode 100644 doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithAggStatusIntegrationTests.scala create mode 100644 doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusIntegrationTests.scala rename core/src/main/scala/za/co/absa/fadb/FunctionStatusWithData.scala => slick/src/test/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala (57%) create mode 100644 slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithAggStatusIntegrationTests.scala create mode 100644 slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusIntegrationTests.scala diff --git a/README.md b/README.md index a834092d..6ea9782d 100644 --- a/README.md +++ b/README.md @@ -47,14 +47,16 @@ within the application.** --- -Currently, the library is developed with Postgres as the target DB. But the approach is applicable to any DB supporting stored procedure/functions – Oracle, MS-SQL, ... +Currently, the library is developed with Postgres as the target DB. But the approach is applicable to any DB +supporting stored procedure/functions – Oracle, MS-SQL, ... ## Usage #### Sbt -Import one of the two available module at the moment. Slick module works with Scala Futures. Doobie module works with any effect type (typically IO or ZIO) provided cats effect's Async instance is available. +Import one of the two available module at the moment. Slick module works with Scala Futures. Doobie module works with +any effect type (typically IO or ZIO) provided cats effect's Async instance is available. ```scala libraryDependencies *= "za.co.absa.fa-db" %% "slick" % "X.Y.Z" @@ -111,7 +113,8 @@ Text about status codes returned from the database function can be found [here]( ## Slick module -As the name suggests it runs on [Slick library](https://github.com/slick/slick) and also brings in the [Slickpg library](https://github.com/tminglei/slick-pg/) for extended Postgres type support. +As the name suggests it runs on [Slick library](https://github.com/slick/slick) and also brings in the [Slickpg library](https://github.com/tminglei/slick-pg/) for extended +Postgres type support. It brings: @@ -119,8 +122,18 @@ It brings: * `class SlickSingleResultFunction` - abstract class for DB functions returning single result * `class SlickMultipleResultFunction` - abstract class for DB functions returning sequence of results * `class SlickOptionalResultFunction` - abstract class for DB functions returning optional result -* `class SlickSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box) -* `trait FaDbPostgresProfile` - to bring support for Postgres and its extended data types in one class (except JSON, as there are multiple implementations for this data type in _Slick-Pg_) +* `class SlickSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an + implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box) +* `class SlickMultipleResultFunctionWithStatus` - as `SlickSingleResultFunctionWithStatus` but for multiple record + retrieval +* `class SlickMultipleResultFunctionWithAggStatus` - as `SlickMultipleResultFunctionWithStatus` but it aggregates + the statuses into a single record. It requires an implementation of `StatusAggregator` to be mixed-in + (`ByFirstErrorStatusAggregator`, `ByFirstRowStatusAggregator`, and `ByMajorityErrorsStatusAggregator` available + out of the box) +* `class SlickOptionalResultFunctionWithStatus` - as `SlickSingleResultFunctionWithStatus` but the returning record + is optional +* `trait FaDbPostgresProfile` - to bring support for Postgres and its extended data types in one class + (except JSON, as there are multiple implementations for this data type in _Slick-Pg_) * `object FaDbPostgresProfile` - instance of the above trait for direct use #### Known issues @@ -143,7 +156,9 @@ val macAddr: Option[MacAddrString] = pr.nextMacAddrOption ## Doobie module -As the name suggests it runs on [Doobie library](https://tpolecat.github.io/doobie/). The main benefit of the module is that it allows to use any effect type (typically IO or ZIO) therefore is more suitable for functional programming. It also brings in the [Doobie-Postgres library](https://tpolecat.github.io/doobie/docs/14-PostgreSQL.html) for extended Postgres type support. +As the name suggests it runs on [Doobie library](https://tpolecat.github.io/doobie/). The main benefit of the module is that it allows to use any +effect type (typically IO or ZIO) therefore is more suitable for functional programming. +It also brings in the [Doobie-Postgres library](https://tpolecat.github.io/doobie/docs/14-PostgreSQL.html) for extended Postgres type support. It brings: @@ -151,9 +166,19 @@ It brings: * `class DoobieSingleResultFunction` - abstract class for DB functions returning single result * `class DoobieMultipleResultFunction` - abstract class for DB functions returning sequence of results * `class DoobieOptionalResultFunction` - abstract class for DB functions returning optional result -* `class DoobieSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box) - -Since Doobie also interoperates with ZIO, there is an example of how a database connection can be properly established within a ZIO application. Please see [this file](doobie/zio-setup.md) for more details. +* `class DoobieSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires + an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box) +* `class DoobieMultipleResultFunctionWithStatus` - as `DoobieSingleResultFunctionWithStatus` but for multiple record + retrieval +* `class DoobieMultipleResultFunctionWithAggStatus` - as `DoobieMultipleResultFunctionWithStatus` but it aggregates + the statuses into a single record. It requires an implementation of `StatusAggregator` to be mixed-in + (`ByFirstErrorStatusAggregator`, `ByFirstRowStatusAggregator`, and `ByMajorityErrorsStatusAggregator` available + out of the box) +* `class DoobieOptionalResultFunctionWithStatus` - as `DoobieSingleResultFunctionWithStatus` but the returning record + is optional + +Since Doobie also interoperates with ZIO, there is an example of how a database connection can be properly established +within a ZIO application. Please see [this file](doobie/zio-setup.md) for more details. ## Testing diff --git a/core/src/main/scala/za/co/absa/fadb/DBEngine.scala b/core/src/main/scala/za/co/absa/fadb/DBEngine.scala index d282749d..2e9db29d 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBEngine.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBEngine.scala @@ -18,7 +18,7 @@ package za.co.absa.fadb import cats.Monad import cats.implicits.toFunctorOps -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.FailedOrRow import scala.language.higherKinds @@ -38,47 +38,52 @@ abstract class DBEngine[F[_]: Monad] { /** * The actual query executioner of the queries of the engine + * + * Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries. + * * @param query - the query to execute * @tparam R - return type of the query * @return - sequence of the results of database query */ protected def run[R](query: QueryType[R]): F[Seq[R]] - - /** - * The actual query executioner of the queries of the engine with status - * @param query - the query to execute - * @tparam R - return type of the query - * @return - result of database query with status - */ - def runWithStatus[R](query: QueryWithStatusType[R]): F[Either[StatusException, R]] + protected def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]] /** * Public method to execute when query is expected to return multiple results + * + * Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries. + * * @param query - the query to execute * @tparam R - return type of the query * @return - sequence of the results of database query */ - def fetchAll[R](query: QueryType[R]): F[Seq[R]] = { - run(query) - } + def fetchAll[R](query: QueryType[R]): F[Seq[R]] = run(query) + def fetchAllWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]] = + runWithStatus(query) /** * Public method to execute when query is expected to return exactly one row + * + * Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries. + * * @param query - the query to execute * @tparam R - return type of the query * @return - sequence of the results of database query */ - def fetchHead[R](query: QueryType[R]): F[R] = { - run(query).map(_.head) - } + def fetchHead[R](query: QueryType[R]): F[R] = run(query).map(_.head) + def fetchHeadWithStatus[R](query: QueryWithStatusType[R]): F[FailedOrRow[R]] = + runWithStatus(query).map(_.head) /** * Public method to execute when query is expected to return one or no results + * + * Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries. + * * @param query - the query to execute * @tparam R - return type of the query * @return - sequence of the results of database query */ - def fetchHeadOption[R](query: QueryType[R]): F[Option[R]] = { - run(query).map(_.headOption) - } + def fetchHeadOption[R](query: QueryType[R]): F[Option[R]] = run(query).map(_.headOption) + def fetchHeadOptionWithStatus[R](query: QueryWithStatusType[R]): F[Option[FailedOrRow[R]]] = + runWithStatus(query).map(_.headOption) } diff --git a/core/src/main/scala/za/co/absa/fadb/DBFunction.scala b/core/src/main/scala/za/co/absa/fadb/DBFunction.scala index e0fdee69..03becddc 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBFunction.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBFunction.scala @@ -18,8 +18,9 @@ package za.co.absa.fadb import cats.MonadError import cats.implicits.toFlatMapOps -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.aggregation.StatusAggregator import za.co.absa.fadb.status.handling.StatusHandling +import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow, Row} import scala.language.higherKinds @@ -105,13 +106,28 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName)) /** - * Executes the database function and returns multiple results. - * @param values The values to pass over to the database function. - * @return A sequence of results from the database function. - */ - def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Either[StatusException, R]] = { - query(values).flatMap(q => dBEngine.runWithStatus(q)) - } + * Executes the database function and returns multiple results. + * @param values - The values to pass over to the database function. + * @return - A sequence of results from the database function. + */ + protected def multipleResults(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[FailedOrRow[R]]] = + query(values).flatMap(q => dBEngine.fetchAllWithStatus(q)) + + /** + * Executes the database function and returns a single result. + * @param values - The values to pass over to the database function. + * @return - A single result from the database function. + */ + protected def singleResult(values: I)(implicit me: MonadError[F, Throwable]): F[FailedOrRow[R]] = + query(values).flatMap(q => dBEngine.fetchHeadWithStatus(q)) + + /** + * Executes the database function and returns an optional result. + * @param values - The values to pass over to the database function. + * @return - An optional result from the database function. + */ + protected def optionalResult(values: I)(implicit me: MonadError[F, Throwable]): F[Option[FailedOrRow[R]]] = + query(values).flatMap(q => dBEngine.fetchHeadOptionWithStatus(q)) /** * The fields to select from the database function call @@ -132,7 +148,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv protected def query(values: I)(implicit me: MonadError[F, Throwable]): F[dBEngine.QueryWithStatusType[R]] // To be provided by an implementation of QueryStatusHandling - override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] + override def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D] } object DBFunction { @@ -206,4 +222,113 @@ object DBFunction { */ def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[R]] = optionalResult(values) } + + /** + * `DBMultipleResultFunctionWithStatus` is an abstract class that represents a database function returning + * multiple results with status information. + * It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a sequence of results. + */ + abstract class DBMultipleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]]( + functionNameOverride: Option[String] = None + )(implicit schema: DBSchema, dBEngine: E) + extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) { + + // A constructor that takes only the mandatory parameters and uses default values for the optional ones + def this()(implicit schema: DBSchema, dBEngine: E) = this(None) + + // A constructor that allows specifying the function name as a string, but not as an option + def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName)) + + /** + * For easy and convenient execution of the DB function call + * @param values - the values to pass over to the database function + * @return - a sequence of values, each coming from a row returned from the DB function transformed to scala + * type `R` wrapped around with Either, providing StatusException if raised + */ + def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[FailedOrRow[R]]] = multipleResults(values) + } + + /** + * `DBMultipleResultFunctionWithAggStatus` is an abstract class that represents a database function returning + * multiple results with status information. + * It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a sequence of results + * + * It's similar to `DBMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value. + * The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`. + */ + abstract class DBMultipleResultFunctionWithAggStatus[I, R, E <: DBEngine[F], F[_]]( + functionNameOverride: Option[String] = None + )(implicit schema: DBSchema, dBEngine: E) + extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) + with StatusAggregator { + + // A constructor that takes only the mandatory parameters and uses default values for the optional ones + def this()(implicit schema: DBSchema, dBEngine: E) = this(None) + + // A constructor that allows specifying the function name as a string, but not as an option + def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName)) + + /** + * For easy and convenient execution of the DB function call + * @param values - the values to pass over to the database function + * @return - a sequence of values, each coming from a row returned from the DB function transformed to scala + * type `R` wrapped around with Either, providing StatusException if raised + */ + def apply(values: I) + (implicit me: MonadError[F, Throwable]): F[FailedOrRows[R]] = + multipleResults(values).flatMap(data => me.pure(aggregate(data))) + } + + /** + * `DBSingleResultFunctionWithStatus` is an abstract class that represents a database function returning + * a single result with status information. + * It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a single result. + */ + abstract class DBSingleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]]( + functionNameOverride: Option[String] = None + )(implicit schema: DBSchema, dBEngine: E) + extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) { + + // A constructor that takes only the mandatory parameters and uses default values for the optional ones + def this()(implicit schema: DBSchema, dBEngine: E) = this(None) + + // A constructor that allows specifying the function name as a string, but not as an option + def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName)) + + /** + * For easy and convenient execution of the DB function call + * @param values - the values to pass over to the database function + * @return - the value returned from the DB function transformed to scala type `R` + * wrapped around with Either, providing StatusException if raised + */ + def apply(values: I)(implicit me: MonadError[F, Throwable]): F[FailedOrRow[R]] = + singleResult(values) + } + + /** + * `DBOptionalResultFunctionWithStatus` is an abstract class that represents a database function returning + * an optional result with status information. + * It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return an optional result. + */ + abstract class DBOptionalResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]]( + functionNameOverride: Option[String] = None + )(implicit schema: DBSchema, dBEngine: E) + extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) { + + // A constructor that takes only the mandatory parameters and uses default values for the optional ones + def this()(implicit schema: DBSchema, dBEngine: E) = this(None) + + // A constructor that allows specifying the function name as a string, but not as an option + def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName)) + + /** + * For easy and convenient execution of the DB function call + * @param values - the values to pass over to the database function + * @return - the value returned from the DB function transformed to scala type `R` if a row is returned, + * otherwise `None`, wrapped around with Either, providing StatusException if raised + */ + def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[FailedOrRow[R]]] = + optionalResult(values) + } + } diff --git a/core/src/main/scala/za/co/absa/fadb/Query.scala b/core/src/main/scala/za/co/absa/fadb/Query.scala index b73acec7..e6ab348a 100644 --- a/core/src/main/scala/za/co/absa/fadb/Query.scala +++ b/core/src/main/scala/za/co/absa/fadb/Query.scala @@ -16,7 +16,7 @@ package za.co.absa.fadb -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.{FailedOrRow, Row} /** * The basis for all query types of [[DBEngine]] implementations @@ -26,32 +26,31 @@ trait Query[R] /** * The basis for all query types of [[DBEngine]] implementations with status - * @tparam A - the initial result type of the query - * @tparam B - the intermediate result type of the query - * @tparam R - the final return type of the query + * @tparam DS - the initial result type of the query (a row basically, having status-related columns as well) + * @tparam D - the intermediate result type of the query (a row without status columns, i.e. data only) + * @tparam R - the final return type of the query (final version of result, depending on the needs, might be the same as D) */ -trait QueryWithStatus[A, B, R] { +trait QueryWithStatus[DS, D, R] { /** * Processes the status of the query and returns the status with data * @param initialResult - the initial result of the query - * @return the status with data + * @return data with status */ - def processStatus(initialResult: A): FunctionStatusWithData[B] + def processStatus(initialResult: DS): Row[D] /** * Converts the status with data to either a status exception or the data * @param statusWithData - the status with data * @return either a status exception or the data */ - def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[B]): Either[StatusException, R] + def toStatusExceptionOrData(statusWithData: Row[D]): FailedOrRow[R] /** * Returns the result of the query or a status exception * @param initialResult - the initial result of the query * @return the result of the query or a status exception */ - def getResultOrException(initialResult: A): Either[StatusException, R] = toStatusExceptionOrData( - processStatus(initialResult) - ) + def getResultOrException(initialResult: DS): FailedOrRow[R] = + toStatusExceptionOrData(processStatus(initialResult)) } diff --git a/core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala b/core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala deleted file mode 100644 index daa9de8c..00000000 --- a/core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.fadb.status - -/** - * Class represents the status of calling a fa-db function (if it supports status that is) - */ -case class FunctionStatus(statusCode: Int, statusText: String) diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregator.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregator.scala new file mode 100644 index 00000000..6b22e12a --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregator.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation + +import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow, Row} + +/** + * `StatusAggregator` is a base trait that defines the interface for aggregating the error statuses of a function + * invocation. It provides methods to aggregate the error statuses into a single status information - this is + * typically needed for database functions that retrieve multiple records. + */ +trait StatusAggregator { + + /** + * Aggregates the error status information into a single error. + * + * @param statusesWithData - The status of the function invocation with data. + * @return Either a `StatusException` if the status code indicates an error, or the data (along with the status + * information so that it's retrievable) if the status being returned doesn't indicate an error. + */ + def aggregate[R](statusesWithData: Seq[FailedOrRow[R]]): FailedOrRows[R] +} + +object StatusAggregator { + private[aggregation] def gatherExceptions[R](eithersWithException: Seq[FailedOrRow[R]]): Seq[StatusException] = { + eithersWithException.flatMap { + case Left(exception) => Some(exception) + case _ => None + } + } + + private[aggregation] def gatherDataWithStatuses[R](eithersWithData: Seq[FailedOrRow[R]]): Seq[Row[R]] = { + eithersWithData.flatMap { + case Left(_) => None + case Right(dataWithStatuses) => Some(dataWithStatuses) + } + } +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregator.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregator.scala new file mode 100644 index 00000000..ec14366d --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregator.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import za.co.absa.fadb.status.aggregation.StatusAggregator +import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow} + +/** + * `ByFirstErrorStatusAggregator` is a trait that extends the `StatusAggregator` interface. + * It provides an implementation for aggregating error statuses of a function invocation into a single error + * by choosing the first error encountered to be the representative one (i.e. if there are multiple errors of other + * types being returned, only the first one would be chosen and the rest would be ignored). + */ +trait ByFirstErrorStatusAggregator extends StatusAggregator { + + override def aggregate[R](statusesWithData: Seq[FailedOrRow[R]]): FailedOrRows[R] = { + val firstError = statusesWithData.find(_.isLeft) + + firstError match { + case Some(errRowFound) => Left(errRowFound.left.toOption.get) + case None => Right(StatusAggregator.gatherDataWithStatuses(statusesWithData)) + } + } + +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregator.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregator.scala new file mode 100644 index 00000000..8de4cda9 --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregator.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import za.co.absa.fadb.status.aggregation.StatusAggregator +import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow} + +/** + * `ByFirstRowStatusAggregator` is a trait that extends the `StatusAggregator` interface. + * It provides an implementation for aggregating error statuses of a function invocation into a single error + * by choosing the first row that was returned to be the representative one + * (i.e. if there is an error on row two or later, it would be ignored). + */ +trait ByFirstRowStatusAggregator extends StatusAggregator { + + override def aggregate[R](statusesWithData: Seq[FailedOrRow[R]]): FailedOrRows[R] = { + val firstRow = statusesWithData.headOption + + firstRow match { + case Some(exceptionOrDataWithStatuses) => exceptionOrDataWithStatuses match { + case Left(statusException) => Left(statusException) + case Right(_) => Right(StatusAggregator.gatherDataWithStatuses(statusesWithData)) + } + case None => Right(Seq.empty) + } + } + +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregator.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregator.scala new file mode 100644 index 00000000..45b8720a --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregator.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import za.co.absa.fadb.status.aggregation.StatusAggregator +import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow} + +/** + * `ByMajorityErrorsStatusAggregator` is a trait that extends the `StatusAggregator` interface. + * It provides an implementation for aggregating error statuses of a function invocation into a single error + * by choosing the error that occurred the most. + */ +trait ByMajorityErrorsStatusAggregator extends StatusAggregator { + + private[aggregation] def gimmeMajorityWinner[T](inputData: Seq[T]): Option[T] = { + if (inputData.isEmpty) { + None + } else { + val grouped = inputData.groupBy(identity) + val maxCount = grouped.values.map(_.size).max + + // list of most frequent one (or ones if maxCount is the same for multiple records) + val mostOccurredAll = grouped.filter(_._2.size == maxCount).keys.toList + + // find stops on first occurrence - so it returns the first it found from `mostOccurredAll` - in case that there + // are more 'majorityWinners', only the first one from `inputData` will be returned + val mostOccurredFirst = inputData.find(mostOccurredAll.contains(_)) + + mostOccurredFirst + } + } + + override def aggregate[R](statusesWithData: Seq[FailedOrRow[R]]): FailedOrRows[R] = { + val allErrors = StatusAggregator.gatherExceptions(statusesWithData) + val majorityError = gimmeMajorityWinner(allErrors) + + majorityError match { + case Some(statusException) => Left(statusException) + case None => Right(StatusAggregator.gatherDataWithStatuses(statusesWithData)) + } + } + +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala index 82768ac5..2e34f67c 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala @@ -16,8 +16,7 @@ package za.co.absa.fadb.status.handling -import za.co.absa.fadb.FunctionStatusWithData -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.{FailedOrRow, Row} /** * `StatusHandling` is a base trait that defines the interface for handling the status of a function invocation. @@ -28,7 +27,8 @@ trait StatusHandling { /** * Checks the status of a function invocation. * @param statusWithData - The status of the function invocation with data. - * @return Either a `StatusException` if the status code indicates an error, or the data if the status code is successful. + * @return Either a `StatusException` if the status code indicates an error, or the data (along with the status + * information so that it's retrievable) if the status code is successful. */ - def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] + def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D] } diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala index 2e491082..cd2e3ff9 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala @@ -16,8 +16,8 @@ package za.co.absa.fadb.status.handling.implementations -import za.co.absa.fadb.FunctionStatusWithData import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FailedOrRow, Row} import za.co.absa.fadb.status.handling.StatusHandling /** @@ -29,10 +29,10 @@ trait StandardStatusHandling extends StatusHandling { /** * Checks the status of a function invocation. */ - override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] = { + override def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D] = { val functionStatus = statusWithData.functionStatus functionStatus.statusCode / 10 match { - case 1 => Right(statusWithData.data) + case 1 => Right(statusWithData) case 2 => Left(ServerMisconfigurationException(functionStatus)) case 3 => Left(DataConflictException(functionStatus)) case 4 => Left(DataNotFoundException(functionStatus)) diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala index 36af1c59..548a004b 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala @@ -16,9 +16,9 @@ package za.co.absa.fadb.status.handling.implementations -import za.co.absa.fadb.FunctionStatusWithData -import za.co.absa.fadb.exceptions.{OtherStatusException, StatusException} +import za.co.absa.fadb.exceptions.OtherStatusException import za.co.absa.fadb.status.handling.StatusHandling +import za.co.absa.fadb.status.{FailedOrRow, Row} /** * Trait represents user defined status handling @@ -26,9 +26,9 @@ import za.co.absa.fadb.status.handling.StatusHandling trait UserDefinedStatusHandling extends StatusHandling { def OKStatuses: Set[Integer] - override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] = + override def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D] = if (OKStatuses.contains(statusWithData.functionStatus.statusCode)) { - Right(statusWithData.data) + Right(statusWithData) } else { Left(OtherStatusException(statusWithData.functionStatus)) } diff --git a/core/src/main/scala/za/co/absa/fadb/status/package.scala b/core/src/main/scala/za/co/absa/fadb/status/package.scala new file mode 100644 index 00000000..38f9aa2a --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/package.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb + +import za.co.absa.fadb.exceptions.StatusException + +package object status { + + /** + * Class represents the status of calling a fa-db function (if it supports status that is) + */ + case class FunctionStatus(statusCode: Int, statusText: String) + + /** + * Represents a function status with data. + * @param functionStatus the function status + * @param data the data of one row (barring the status fields) + * @tparam D the type of the data + */ + case class Row[D](functionStatus: FunctionStatus, data: D) + + /** + * This is a representation of a single row returned from a DB function with processed status information. + * + * Note: D here represents a single row reduced by status-related columns, i.e. a type of data. + */ + type FailedOrRow[D] = Either[StatusException, Row[D]] + + /** + * This is a representation of multiple rows returned from a DB function with processed status information, + * with error statuses aggregated to a single one. + * + * Note: D here represents a single row reduced by status-related columns, i.e. a type of data. + */ + type FailedOrRows[D] = Either[StatusException, Seq[Row[D]]] +} diff --git a/core/src/test/scala/za/co/absa/fadb/DBFunctionUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/DBFunctionUnitTests.scala index 236be3f0..618e99b9 100644 --- a/core/src/test/scala/za/co/absa/fadb/DBFunctionUnitTests.scala +++ b/core/src/test/scala/za/co/absa/fadb/DBFunctionUnitTests.scala @@ -20,8 +20,8 @@ import cats.MonadError import cats.implicits._ import org.scalatest.funsuite.AnyFunSuite import za.co.absa.fadb.DBFunction.DBSingleResultFunction -import za.co.absa.fadb.exceptions.StatusException import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits.namingConvention +import za.co.absa.fadb.status.FailedOrRow import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -34,7 +34,7 @@ class DBFunctionUnitTests extends AnyFunSuite { class EngineThrow extends DBEngine[Future] { override def run[R](query: QueryType[R]): Future[Seq[R]] = neverHappens - override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Either[StatusException, R]] = neverHappens + override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Seq[FailedOrRow[R]]] = neverHappens } private object FooNamed extends DBSchema diff --git a/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionUnitTests.scala index 8bbd30fc..6c5dddb6 100644 --- a/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionUnitTests.scala +++ b/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionUnitTests.scala @@ -1,5 +1,5 @@ /* - * Copyright 2022ABSA Group Limited + * Copyright 2022 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregatorUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregatorUnitTests.scala new file mode 100644 index 00000000..64cdd50d --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregatorUnitTests.scala @@ -0,0 +1,76 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation + +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, Row} + +class StatusAggregatorUnitTests extends AnyFunSuite { + + test("gatherExceptions should return empty Seq on empty input") { + val testData = Seq.empty + val expectedGatheredExceptions = Seq.empty + + val actualGatheredExceptions = StatusAggregator.gatherExceptions(testData) + assert(actualGatheredExceptions == expectedGatheredExceptions) + } + + test("gatherExceptions should gather exceptions") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(ErrorInDataException(FunctionStatus(50, "Some data error"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedGatheredExceptions = Seq( + DataNotFoundException(FunctionStatus(42, "Data not found")), + ErrorInDataException(FunctionStatus(50, "Some data error")) + ) + + val actualGatheredExceptions = StatusAggregator.gatherExceptions(testData) + assert(actualGatheredExceptions == expectedGatheredExceptions) + } + + test("gatherDataWithStatuses should return empty Seq on empty input") { + val testData = Seq.empty + val expectedGatheredExceptions = Seq.empty + + val actualGatheredExceptions = StatusAggregator.gatherDataWithStatuses(testData) + assert(actualGatheredExceptions == expectedGatheredExceptions) + } + + test("gatherDataWithStatuses should gather exceptions") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(ErrorInDataException(FunctionStatus(50, "Some data error"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedGatheredData = Seq( + Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + + val actualGatheredData = StatusAggregator.gatherDataWithStatuses(testData) + assert(actualGatheredData == expectedGatheredData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregatorUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregatorUnitTests.scala new file mode 100644 index 00000000..584a39bd --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstErrorStatusAggregatorUnitTests.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, Row} + +class ByFirstErrorStatusAggregatorUnitTests extends AnyFunSuiteLike { + + private val aggregateByFirstErrorUnderTest = new ByFirstErrorStatusAggregator {} + + test("aggregate should return Empty Seq in Right for an empty Sequence") { + val testData = Seq.empty + val expectedAggData = Right(Seq.empty) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return Seq with data in Right for a Sequence with data (no errors") { + val rawTestData = Seq( + Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + val testData = rawTestData.map(Right(_)) // wrap so that it's Seq of Eithers with data + val expectedAggData = Right(rawTestData) // wrap so that it's Either of Seq with data + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is single error status code, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is a single error status code along with data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes along with data") { + val testData = Seq( + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregatorUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregatorUnitTests.scala new file mode 100644 index 00000000..e34558f9 --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByFirstRowStatusAggregatorUnitTests.scala @@ -0,0 +1,102 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, Row} + +class ByFirstRowStatusAggregatorUnitTests extends AnyFunSuiteLike { + + private val aggregateByFirstRowUnderTest = new ByFirstRowStatusAggregator {} + + test("aggregate should return Empty Seq in Right for an empty Sequence") { + val testData = Seq.empty + val expectedAggData = Right(Seq.empty) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return Seq with data in Right for a Sequence with data (no errors") { + val rawTestData = Seq( + Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + val testData = rawTestData.map(Right(_)) // wrap so that it's Seq of Eithers with data + val expectedAggData = Right(rawTestData) // wrap so that it's Either of Seq with data + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is single error status code, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is an error status as the first row, along with data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return data only, when there are multiple error status codes on non-first row, along with data") { + val testData = Seq( + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Right( + Seq( + Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + ) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregatorUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregatorUnitTests.scala new file mode 100644 index 00000000..6c31051d --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/ByMajorityErrorsStatusAggregatorUnitTests.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, Row} + +class ByMajorityErrorsStatusAggregatorUnitTests extends AnyFunSuiteLike { + + private val aggregateByMajorityErrorsUnderTest = new ByMajorityErrorsStatusAggregator {} + + test("gimmeMajorityWinner should return None for an empty Sequence") { + val testData = Seq.empty + val expectedError = None + + val actualError = aggregateByMajorityErrorsUnderTest.gimmeMajorityWinner(testData) + assert(actualError == expectedError) + } + + test("gimmeMajorityWinner should return error that occurred the most (one error has biggest distribution)") { + val testData: Seq[StatusException] = Seq( + DataNotFoundException(FunctionStatus(42, "Data not found")), + OtherStatusException(FunctionStatus(91, "Non classified error")), + ErrorInDataException(FunctionStatus(51, "Data not found another")), + DataNotFoundException(FunctionStatus(42, "Data not found")), + ) + val expectedError = DataNotFoundException(FunctionStatus(42, "Data not found")) + + val actualError = aggregateByMajorityErrorsUnderTest.gimmeMajorityWinner(testData) + assert(actualError.get == expectedError) + } + + test("gimmeMajorityWinner should return error that occurred the most (uniform distribution of errors, first wins)") { + val testData: Seq[StatusException] = Seq( + DataNotFoundException(FunctionStatus(42, "Data not found")), + OtherStatusException(FunctionStatus(91, "Non classified error")), + OtherStatusException(FunctionStatus(91, "Non classified error")), + DataNotFoundException(FunctionStatus(42, "Data not found")), + ) + val expectedError = DataNotFoundException(FunctionStatus(42, "Data not found")) + + val actualError = aggregateByMajorityErrorsUnderTest.gimmeMajorityWinner(testData) + assert(actualError.get == expectedError) + } + + test("aggregate should return Empty Seq in Right for an empty Sequence") { + val testData = Seq.empty + val expectedAggData = Right(Seq.empty) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return Seq with data in Right for a Sequence with data (no errors") { + val rawTestData = Seq( + Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + val testData = rawTestData.map(Right(_)) // wrap so that it's Seq of Eithers with data + val expectedAggData = Right(rawTestData) // wrap so that it's Either of Seq with data + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is single error status code, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes, uniform distribution, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is an error status as the first row, along with data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes with majority on err 42, along with data") { + val testData = Seq( + Right(Row(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Right(Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandlingUnitTests.scala b/core/src/test/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandlingUnitTests.scala index 6e6b94fa..ce831a78 100644 --- a/core/src/test/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandlingUnitTests.scala +++ b/core/src/test/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandlingUnitTests.scala @@ -18,9 +18,8 @@ package za.co.absa.fadb.status.handling.implementations import org.scalatest.funsuite.AnyFunSuiteLike import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper -import za.co.absa.fadb.FunctionStatusWithData import za.co.absa.fadb.exceptions._ -import za.co.absa.fadb.status.FunctionStatus +import za.co.absa.fadb.status.{FunctionStatus, Row} class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { @@ -29,16 +28,16 @@ class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { test("checkStatus should return Right when status code is in the range 10-19") { for (statusCode <- 10 to 19) { val functionStatus = FunctionStatus(statusCode, "Success") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) - result shouldBe Right("Data") + result shouldBe Right(statusWithData) } } test("checkStatus should return Left with ServerMisconfigurationException when status code is in the range 20-29") { for (statusCode <- 20 to 29) { val functionStatus = FunctionStatus(statusCode, "Server Misconfiguration") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(ServerMisconfigurationException(functionStatus)) } @@ -47,7 +46,7 @@ class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { test("checkStatus should return Left with DataConflictException when status code is in the range 30-39") { for (statusCode <- 30 to 39) { val functionStatus = FunctionStatus(statusCode, "Data Conflict") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(DataConflictException(functionStatus)) } @@ -56,7 +55,7 @@ class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { test("checkStatus should return Left with DataNotFoundException when status code is in the range 40-49") { for (statusCode <- 40 to 49) { val functionStatus = FunctionStatus(statusCode, "Data Not Found") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(DataNotFoundException(functionStatus)) } @@ -65,7 +64,7 @@ class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { test("checkStatus should return Left with ErrorInDataException when status code is in the range 50-89") { for (statusCode <- 50 to 89) { val functionStatus = FunctionStatus(statusCode, "Error in Data") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(ErrorInDataException(functionStatus)) } @@ -74,7 +73,7 @@ class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { test("checkStatus should return Left with OtherStatusException when status code is in the range 90-99") { for (statusCode <- 90 to 99) { val functionStatus = FunctionStatus(statusCode, "Other Status") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(OtherStatusException(functionStatus)) } @@ -83,14 +82,14 @@ class StandardStatusHandlingUnitTests extends AnyFunSuiteLike { test("checkStatus should return Left with StatusOutOfRangeException when status code is not in any known range") { for (statusCode <- 0 to 9) { val functionStatus = FunctionStatus(statusCode, "Out of range") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(StatusOutOfRangeException(functionStatus)) } for (statusCode <- 100 to 110) { val functionStatus = FunctionStatus(statusCode, "Out of range") - val statusWithData = FunctionStatusWithData(functionStatus, "Data") + val statusWithData = Row(functionStatus, "Data") val result = standardQueryStatusHandling.checkStatus(statusWithData) result shouldBe Left(StatusOutOfRangeException(functionStatus)) } diff --git a/demo_database/src/main/postgres/integration/V1.2.99__insert_test_data.sql b/demo_database/src/main/postgres/integration/V1.2.99__insert_test_data.sql index a8351354..33c4ecee 100644 --- a/demo_database/src/main/postgres/integration/V1.2.99__insert_test_data.sql +++ b/demo_database/src/main/postgres/integration/V1.2.99__insert_test_data.sql @@ -16,7 +16,9 @@ INSERT INTO integration.actors VALUES (49, 'Pavel', 'Marek'), - (50, 'Liza', 'Simpson'); + (50, 'Liza', 'Simpson'), + (51, 'Fred', 'Weasley'), + (52, 'George', 'Weasley'); INSERT INTO integration.other_types VALUES ( diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala index d33585b1..180ac534 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala @@ -22,7 +22,7 @@ import doobie._ import doobie.implicits._ import doobie.util.Read import za.co.absa.fadb.DBEngine -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.FailedOrRow import scala.language.higherKinds @@ -53,36 +53,37 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ } /** - * Executes a Doobie query and returns the result as an `F[Either[StatusException, R]]`. + * Executes a Doobie query and returns the result. + * + * Note: `StatusWithData` is needed here because it is more 'flat' in comparison to `FunctionStatusWithData` + * and Doobie's `Read` wasn't able to work with it. * * @param query the Doobie query to execute * @param readStatusWithDataR the `Read[StatusWithData[R]]` instance used to read the query result into `StatusWithData[R]` - * @return the query result as an `F[Either[StatusException, R]]` + * @return the query result */ private def executeQueryWithStatus[R]( query: QueryWithStatusType[R] - )(implicit readStatusWithDataR: Read[StatusWithData[R]]): F[Either[StatusException, R]] = { - // .unique returns a single value, raising an exception if there is not exactly one row returned - // https://tpolecat.github.io/doobie/docs/04-Selecting.html - query.fragment.query[StatusWithData[R]].unique.transact(transactor).map(query.getResultOrException) + )(implicit readStatusWithDataR: Read[StatusWithData[R]]): F[Seq[FailedOrRow[R]]] = { + query.fragment.query[StatusWithData[R]].to[Seq].transact(transactor).map(_.map(query.getResultOrException)) } /** * Runs a Doobie query and returns the result as an `F[Seq[R]]`. * * @param query the Doobie query to run - * @return the query result as an `F[Seq[R]]` + * @return the query result */ override def run[R](query: QueryType[R]): F[Seq[R]] = executeQuery(query)(query.readR) /** - * Runs a Doobie query and returns the result as an `F[Either[StatusException, R]]`. + * Runs a Doobie query and returns the result. * * @param query the Doobie query to run - * @return the query result as an `F[Either[StatusException, R]]` + * @return the query result */ - override def runWithStatus[R](query: QueryWithStatusType[R]): F[Either[StatusException, R]] = { + override def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]] = { executeQueryWithStatus(query)(query.readStatusWithDataR) } } diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala index 1070f2c8..f58896c5 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala @@ -21,8 +21,8 @@ import doobie.implicits.toSqlInterpolator import doobie.util.Read import doobie.util.fragment.Fragment import za.co.absa.fadb.DBFunction._ -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.{DBFunctionWithStatus, DBSchema, FunctionStatusWithData} +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.status.{FailedOrRow, Row} import scala.language.higherKinds @@ -196,7 +196,7 @@ trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] { } // This is to be mixed in by an implementation of StatusHandling - def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] + def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D] } /** @@ -206,16 +206,36 @@ trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] { object DoobieFunction { /** - * `DoobieSingleResultFunctionWithStatus` represents a db function that returns a single result with status. + * `DoobieSingleResultFunction` represents a db function that returns a single result. * * @param toFragmentsSeq a function that generates a sequence of `Fragment`s * @param functionNameOverride the optional override for the function name * @param schema the database schema * @param dbEngine the `DoobieEngine` instance used to execute SQL queries - * @param readR Read instance for `R` - * @param readSelectWithStatus Read instance for `StatusWithData[R]` + * @param readR the `Read[R]` instance used to read the query result into `R` * @tparam F the effect type, which must have an `Async` and a `Monad` instance */ + abstract class DoobieSingleResultFunction[I, R, F[_]]( + override val toFragmentsSeq: I => Seq[Fragment], + functionNameOverride: Option[String] = None + )(implicit + override val schema: DBSchema, + val dbEngine: DoobieEngine[F], + val readR: Read[R] + ) extends DBSingleResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) + with DoobieFunction[I, R, F] + + /** + * `DoobieSingleResultFunctionWithStatus` represents a db function that returns a single result with status. + * + * @param toFragmentsSeq a function that generates a sequence of `Fragment`s + * @param functionNameOverride the optional override for the function name + * @param schema the database schema + * @param dbEngine the `DoobieEngine` instance used to execute SQL queries + * @param readR Read instance for `R` + * @param readSelectWithStatus Read instance for `StatusWithData[R]` + * @tparam F the effect type, which must have an `Async` and a `Monad` instance + */ abstract class DoobieSingleResultFunctionWithStatus[I, R, F[_]]( override val toFragmentsSeq: I => Seq[Fragment], functionNameOverride: Option[String] = None @@ -224,41 +244,51 @@ object DoobieFunction { val dbEngine: DoobieEngine[F], val readR: Read[R], val readSelectWithStatus: Read[StatusWithData[R]] - ) extends DBFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) + ) extends DBSingleResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) with DoobieFunctionWithStatus[I, R, F] /** - * `DoobieSingleResultFunction` represents a db function that returns a single result. - * - * @param toFragmentsSeq a function that generates a sequence of `Fragment`s - * @param functionNameOverride the optional override for the function name - * @param schema the database schema - * @param dbEngine the `DoobieEngine` instance used to execute SQL queries - * @param readR the `Read[R]` instance used to read the query result into `R` - * @tparam F the effect type, which must have an `Async` and a `Monad` instance + * `DoobieMultipleResultFunction` represents a db function that returns multiple results. */ - abstract class DoobieSingleResultFunction[I, R, F[_]]( + abstract class DoobieMultipleResultFunction[I, R, F[_]]( override val toFragmentsSeq: I => Seq[Fragment], functionNameOverride: Option[String] = None )(implicit override val schema: DBSchema, val dbEngine: DoobieEngine[F], val readR: Read[R] - ) extends DBSingleResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) + ) extends DBMultipleResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) with DoobieFunction[I, R, F] /** - * `DoobieMultipleResultFunction` represents a db function that returns multiple results. + * `DoobieMultipleResultFunctionWithStatus` represents a db function that returns multiple results with statuses. */ - abstract class DoobieMultipleResultFunction[I, R, F[_]]( + abstract class DoobieMultipleResultFunctionWithStatus[I, R, F[_]]( override val toFragmentsSeq: I => Seq[Fragment], functionNameOverride: Option[String] = None )(implicit override val schema: DBSchema, val dbEngine: DoobieEngine[F], val readR: Read[R] - ) extends DBMultipleResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) - with DoobieFunction[I, R, F] + ) extends DBMultipleResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) + with DoobieFunctionWithStatus[I, R, F] + + /** + * `DoobieMultipleResultFunctionWithAggStatus` represents a db function that returns multiple results with statuses. + * + * It's similar as `DoobieMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value. + * + * The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`. + */ + abstract class DoobieMultipleResultFunctionWithAggStatus[I, R, F[_]]( + override val toFragmentsSeq: I => Seq[Fragment], + functionNameOverride: Option[String] = None + )(implicit + override val schema: DBSchema, + val dbEngine: DoobieEngine[F], + val readR: Read[R] + ) extends DBMultipleResultFunctionWithAggStatus[I, R, DoobieEngine[F], F](functionNameOverride) + with DoobieFunctionWithStatus[I, R, F] /** * `DoobieOptionalResultFunction` represents a db function that returns an optional result. @@ -272,4 +302,17 @@ object DoobieFunction { val readR: Read[R] ) extends DBOptionalResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) with DoobieFunction[I, R, F] + + /** + * `DoobieOptionalResultFunctionWithStatus` represents a db function that returns an optional result. + */ + abstract class DoobieOptionalResultFunctionWithStatus[I, R, F[_]]( + override val toFragmentsSeq: I => Seq[Fragment], + functionNameOverride: Option[String] = None + )(implicit + override val schema: DBSchema, + val dbEngine: DoobieEngine[F], + val readR: Read[R] + ) extends DBOptionalResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) + with DoobieFunctionWithStatus[I, R, F] } diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala index 0909d363..fedeb57e 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala @@ -18,9 +18,8 @@ package za.co.absa.fadb.doobie import doobie.util.Read import doobie.util.fragment.Fragment -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.status.FunctionStatus -import za.co.absa.fadb.{FunctionStatusWithData, Query, QueryWithStatus} +import za.co.absa.fadb.status.{FailedOrRow, FunctionStatus, Row} +import za.co.absa.fadb.{Query, QueryWithStatus} /** * `DoobieQuery` is a class that extends `Query` with `R` as the result type. @@ -41,23 +40,23 @@ class DoobieQuery[R](val fragment: Fragment)(implicit val readR: Read[R]) extend */ class DoobieQueryWithStatus[R]( val fragment: Fragment, - checkStatus: FunctionStatusWithData[R] => Either[StatusException, R] + checkStatus: Row[R] => FailedOrRow[R] )(implicit val readStatusWithDataR: Read[StatusWithData[R]]) extends QueryWithStatus[StatusWithData[R], R, R] { /* * Processes the status of the query and returns the status with data * @param initialResult - the initial result of the query - * @return the status with data + * @return data with status */ - override def processStatus(initialResult: StatusWithData[R]): FunctionStatusWithData[R] = - FunctionStatusWithData(FunctionStatus(initialResult.status, initialResult.statusText), initialResult.data) + override def processStatus(initialResult: StatusWithData[R]): Row[R] = + Row(FunctionStatus(initialResult.status, initialResult.statusText), initialResult.data) /* * Converts the status with data to either a status exception or the data * @param statusWithData - the status with data * @return either a status exception or the data */ - override def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[R]): Either[StatusException, R] = + override def toStatusExceptionOrData(statusWithData: Row[R]): FailedOrRow[R] = checkStatus(statusWithData) } diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/StatusWithData.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/StatusWithData.scala index fefaebd5..d51c6b2c 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/StatusWithData.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/StatusWithData.scala @@ -17,6 +17,11 @@ package za.co.absa.fadb.doobie /** - * Represents a function status with data. + * Represents a function status with data (basically a row returned from a DB). + * + * Note: data here represent one row that has status-related fields omitted. In some scenarios, it actually might + * be missing - i.e. query returns only status information with no data - pretty common scenario when a DB + * function returns error, e.g.: (41, 'NoDataFound', NULL). In this case, R must be an Option - to be specified + * by a user. */ case class StatusWithData[R](status: Int, statusText: String, data: R) diff --git a/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithAggStatusIntegrationTests.scala b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithAggStatusIntegrationTests.scala new file mode 100644 index 00000000..ffa433d0 --- /dev/null +++ b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithAggStatusIntegrationTests.scala @@ -0,0 +1,100 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.doobie + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import doobie.Fragment +import doobie.implicits.toSqlInterpolator +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.fadb.status.aggregation.implementations.ByMajorityErrorsStatusAggregator +import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.fadb.status.{FunctionStatus, Row} + +class DoobieMultipleResultFunctionWithAggStatusIntegrationTests extends AnyFunSuite with DoobieTest { + + private val getActorsByLastnameQueryFragments: GetActorsByLastnameQueryParameters => Seq[Fragment] = { + values => Seq(fr"${values.lastName}", fr"${values.firstName}") + } + + class GetActorsByLastname(implicit schema: DBSchema, dbEngine: DoobieEngine[IO]) + // Option[Actor] because: Actor might not exist, and the function would return only status info without actor data + extends DoobieMultipleResultFunctionWithAggStatus[GetActorsByLastnameQueryParameters, Option[Actor], IO](getActorsByLastnameQueryFragments) + with StandardStatusHandling + with ByMajorityErrorsStatusAggregator { + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") + } + + private val getActorsByLastname = new GetActorsByLastname()(Integration, new DoobieEngine(transactor)) + + test("Retrieving multiple actors from database, lastName match") { + val expectedResultElem = Set( + Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(51, "Fred", "Weasley"))), + Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(52, "George", "Weasley"))), + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).unsafeRunSync() + val actualData = results match { + case Left(_) => fail("should not be left") + case Right(dataWithStatuses) => dataWithStatuses + } + assert(actualData.length == expectedResultElem.size) + assert(actualData.toSet == expectedResultElem) + } + + test("Retrieving single actor from database, full match") { + val expectedResultElem = Row( + FunctionStatus(12, "OK, full match"), Some(Actor(50, "Liza", "Simpson")) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson", Some("Liza"))).unsafeRunSync() + val actualData = results match { + case Left(_) => fail("should not be left") + case Right(dataWithStatuses) => dataWithStatuses + } + + assert(actualData.length == 1) + assert(actualData.head == expectedResultElem) + } + + test("Retrieving single actor from database, lastname match") { + val expectedResultElem = Row( + FunctionStatus(11, "OK, match on last name only"), Some(Actor(50, "Liza", "Simpson")) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson")).unsafeRunSync() + val actualData = results match { + case Left(_) => fail("should not be left") + case Right(dataWithStatuses) => dataWithStatuses + } + + assert(actualData.length == 1) + assert(actualData.head == expectedResultElem) + } + + test("Retrieving non-existing actor from database, no match") { + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("TotallyNonExisting!")).unsafeRunSync() + results match { + case Left(err) => + assert(err.status.statusText == "No actor found") + assert(err.status.statusCode == 41) + case Right(_) => fail("should not be right") + } + } +} diff --git a/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusIntegrationTests.scala b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusIntegrationTests.scala new file mode 100644 index 00000000..dd0122f4 --- /dev/null +++ b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusIntegrationTests.scala @@ -0,0 +1,83 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.doobie + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import doobie.Fragment +import doobie.implicits.toSqlInterpolator +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithStatus +import za.co.absa.fadb.exceptions.{DataNotFoundException, StatusException} +import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.fadb.status.{FunctionStatus, Row} + +class DoobieMultipleResultFunctionWithStatusIntegrationTests extends AnyFunSuite with DoobieTest { + + private val getActorsByLastnameQueryFragments: GetActorsByLastnameQueryParameters => Seq[Fragment] = { values => + Seq(fr"${values.lastName}", fr"${values.firstName}") + } + + class GetActorsByLastname(implicit schema: DBSchema, dbEngine: DoobieEngine[IO]) + // Option[Actor] because: Actor might not exist, and the function would return only status info without actor data + extends DoobieMultipleResultFunctionWithStatus[GetActorsByLastnameQueryParameters, Option[Actor], IO]( + getActorsByLastnameQueryFragments + ) + with StandardStatusHandling { + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") + } + + private val getActorsByLastname = new GetActorsByLastname()(Integration, new DoobieEngine(transactor)) + + test("Retrieving multiple actors from database, lastName match") { + val expectedResultElem = Set( + Right(Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(51, "Fred", "Weasley")))), + Right(Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(52, "George", "Weasley")))) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).unsafeRunSync() + assert(results.toSet == expectedResultElem) + } + + test("Retrieving single actor from database, full match") { + val expectedResultElem = Set( + Right(Row(FunctionStatus(12, "OK, full match"), Some(Actor(50, "Liza", "Simpson")))) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson", Some("Liza"))).unsafeRunSync() + assert(results.toSet == expectedResultElem) + } + + test("Retrieving single actor from database, lastname match") { + val expectedResultElem = Set( + Right(Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(50, "Liza", "Simpson")))) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson")).unsafeRunSync() + assert(results.toSet == expectedResultElem) + } + + test("Retrieving non-existing actor from database, no match") { + val expectedErr = Left(DataNotFoundException(FunctionStatus(41, "No actor found"))) + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("TotallyNonExisting!")).unsafeRunSync() + + assert(results.length == 1) + assert(results.head.isLeft) + assert(results.head == expectedErr) + } +} diff --git a/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieSingleResultFunctionWithStatusIntegrationTests.scala b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieSingleResultFunctionWithStatusIntegrationTests.scala index 9521b3b7..ead0d48c 100644 --- a/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieSingleResultFunctionWithStatusIntegrationTests.scala +++ b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieSingleResultFunctionWithStatusIntegrationTests.scala @@ -84,7 +84,7 @@ class DoobieSingleResultFunctionWithStatusIntegrationTests extends AnyFunSuite w val result = errorIfNotOne(1).unsafeRunSync() result match { case Left(_) => fail("should not be left") - case Right(value) => assert(value.contains(1)) + case Right(value) => assert(value.data.contains(1)) } } } diff --git a/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieTest.scala b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieTest.scala index fe13f1aa..170c41bd 100644 --- a/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieTest.scala +++ b/doobie/src/test/scala/za/co/absa/fadb/doobie/DoobieTest.scala @@ -24,6 +24,7 @@ import za.co.absa.fadb.DBSchema trait DoobieTest { case class Actor(actorId: Int, firstName: String, lastName: String) case class GetActorsQueryParameters(firstName: Option[String], lastName: Option[String]) + case class GetActorsByLastnameQueryParameters(lastName: String, firstName: Option[String] = None) case class CreateActorRequestBody(firstName: String, lastName: String) import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala index f1720913..89e351cb 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala @@ -18,9 +18,9 @@ package za.co.absa.fadb.slick import cats.MonadError import slick.jdbc.{GetResult, SQLActionBuilder} -import za.co.absa.fadb.DBFunction.{DBMultipleResultFunction, DBOptionalResultFunction, DBSingleResultFunction} -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.{DBFunctionWithStatus, DBSchema, FunctionStatusWithData} +import za.co.absa.fadb.DBFunction._ +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.status.{FailedOrRow, Row} import scala.concurrent.Future import scala.language.higherKinds @@ -89,43 +89,66 @@ private[slick] trait SlickFunctionWithStatus[I, R] extends SlickFunctionBase[I, } // Expected to be mixed in by an implementation of StatusHandling - def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] + def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D] } object SlickFunction { /** - * Class for Slick DB functions with status support. + * Class for Slick DB functions with single result. */ + abstract class SlickSingleResultFunction[I, R]( + functionNameOverride: Option[String] = None + )(implicit + override val schema: DBSchema, + dBEngine: SlickPgEngine + ) extends DBSingleResultFunction[I, R, SlickPgEngine, Future](functionNameOverride) + with SlickFunction[I, R] + + /** + * Similar as above but with the status support. + */ abstract class SlickSingleResultFunctionWithStatus[I, R]( functionNameOverride: Option[String] = None )(implicit override val schema: DBSchema, dBEngine: SlickPgEngine - ) extends DBFunctionWithStatus[I, R, SlickPgEngine, Future](functionNameOverride) + ) extends DBSingleResultFunctionWithStatus[I, R, SlickPgEngine, Future](functionNameOverride) with SlickFunctionWithStatus[I, R] /** - * Class for Slick DB functions with single result. - */ - abstract class SlickSingleResultFunction[I, R]( + * Class for Slick DB functions with multiple results. + */ + abstract class SlickMultipleResultFunction[I, R]( functionNameOverride: Option[String] = None )(implicit override val schema: DBSchema, dBEngine: SlickPgEngine - ) extends DBSingleResultFunction[I, R, SlickPgEngine, Future](functionNameOverride) + ) extends DBMultipleResultFunction[I, R, SlickPgEngine, Future](functionNameOverride) with SlickFunction[I, R] /** - * Class for Slick DB functions with multiple results. + * Similar as above but with the status support. + */ + abstract class SlickMultipleResultFunctionWithStatus[I, R]( + functionNameOverride: Option[String] = None + )(implicit + override val schema: DBSchema, + dBEngine: SlickPgEngine + ) extends DBMultipleResultFunctionWithStatus[I, R, SlickPgEngine, Future](functionNameOverride) + with SlickFunctionWithStatus[I, R] + + /** + * Similar as `SlickMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value. + * The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`. */ - abstract class SlickMultipleResultFunction[I, R]( + abstract class SlickMultipleResultFunctionWithAggStatus[I, R]( functionNameOverride: Option[String] = None )(implicit override val schema: DBSchema, dBEngine: SlickPgEngine - ) extends DBMultipleResultFunction[I, R, SlickPgEngine, Future](functionNameOverride) - with SlickFunction[I, R] + ) extends DBMultipleResultFunctionWithAggStatus[I, R, SlickPgEngine, Future](functionNameOverride) + with SlickFunctionWithStatus[I, R] /** * Class for Slick DB functions with optional result. @@ -137,4 +160,15 @@ object SlickFunction { dBEngine: SlickPgEngine ) extends DBOptionalResultFunction[I, R, SlickPgEngine, Future](functionNameOverride) with SlickFunction[I, R] + + /** + * Similar as above but with the status support. + */ + abstract class SlickOptionalResultFunctionWithStatus[I, R]( + functionNameOverride: Option[String] = None + )(implicit + override val schema: DBSchema, + dBEngine: SlickPgEngine + ) extends DBOptionalResultFunctionWithStatus[I, R, SlickPgEngine, Future](functionNameOverride) + with SlickFunctionWithStatus[I, R] } diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala index 61eba4c3..1fd0ad03 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala @@ -19,7 +19,7 @@ package za.co.absa.fadb.slick import cats.implicits._ import slick.jdbc.PostgresProfile.api._ import za.co.absa.fadb.DBEngine -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.FailedOrRow import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds @@ -55,8 +55,8 @@ class SlickPgEngine(val db: Database)(implicit val executor: ExecutionContext) e * @tparam R - return the of the query * @return - either status exception or result of database query */ - override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Either[StatusException, R]] = { - val slickAction = query.sql.as[Either[StatusException, R]](query.getStatusExceptionOrData).head + override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Seq[FailedOrRow[R]]] = { + val slickAction = query.sql.as[FailedOrRow[R]](query.getStatusExceptionOrData) db.run(slickAction) } } diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala index 4260e856..266c038b 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala @@ -17,9 +17,8 @@ package za.co.absa.fadb.slick import slick.jdbc.{GetResult, PositionedResult, SQLActionBuilder} -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.status.FunctionStatus -import za.co.absa.fadb.{FunctionStatusWithData, Query, QueryWithStatus} +import za.co.absa.fadb.status.{FailedOrRow, FunctionStatus, Row} +import za.co.absa.fadb.{Query, QueryWithStatus} /** * SQL query representation for Slick @@ -40,18 +39,18 @@ class SlickQuery[R](val sql: SQLActionBuilder, val getResult: GetResult[R]) exte class SlickQueryWithStatus[R]( val sql: SQLActionBuilder, val getResult: GetResult[R], - checkStatus: FunctionStatusWithData[PositionedResult] => Either[StatusException, PositionedResult] + checkStatus: Row[PositionedResult] => FailedOrRow[PositionedResult] ) extends QueryWithStatus[PositionedResult, PositionedResult, R] { /** * Processes the status of the query and returns the status with data * @param initialResult - the initial result of the query - * @return the status with data + * @return data with status */ - override def processStatus(initialResult: PositionedResult): FunctionStatusWithData[PositionedResult] = { + override def processStatus(initialResult: PositionedResult): Row[PositionedResult] = { val status: Int = initialResult.<< val statusText: String = initialResult.<< - FunctionStatusWithData(FunctionStatus(status, statusText), initialResult) + Row(FunctionStatus(status, statusText), initialResult) } /** @@ -60,20 +59,26 @@ class SlickQueryWithStatus[R]( * @return either a status exception or the data */ override def toStatusExceptionOrData( - statusWithData: FunctionStatusWithData[PositionedResult] - ): Either[StatusException, R] = { + statusWithData: Row[PositionedResult] + ): FailedOrRow[R] = { checkStatus(statusWithData) match { - case Left(statusException) => Left(statusException) - case Right(value) => Right(getResult(value)) + case Left(statusException) => Left(statusException) + case Right(value) => + val status = value.functionStatus + val data = getResult(value.data) + Right(Row(status, data)) } } /** * Combines the processing of the status and the conversion of the status with data to either a status exception or the data + * + * Note: GetResult processes data row by row. + * * @return the GetResult, that combines the processing of the status and the conversion of the status with data * to either a status exception or the data */ - def getStatusExceptionOrData: GetResult[Either[StatusException, R]] = { + def getStatusExceptionOrData: GetResult[FailedOrRow[R]] = { GetResult(pr => processStatus(pr)).andThen(fs => toStatusExceptionOrData(fs)) } } diff --git a/core/src/main/scala/za/co/absa/fadb/FunctionStatusWithData.scala b/slick/src/test/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala similarity index 57% rename from core/src/main/scala/za/co/absa/fadb/FunctionStatusWithData.scala rename to slick/src/test/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala index d93a4b85..e5904d61 100644 --- a/core/src/main/scala/za/co/absa/fadb/FunctionStatusWithData.scala +++ b/slick/src/test/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala @@ -14,14 +14,20 @@ * limitations under the License. */ -package za.co.absa.fadb +package za.co.absa.fadb.slick -import za.co.absa.fadb.status.FunctionStatus +import slick.jdbc.{GetResult, PositionedResult} /** - * Represents a function status with data. - * @param functionStatus the function status - * @param data the data - * @tparam A the type of the data + * A trait representing a converter from a Slick PositionedResult to an Actor. + * The trait is to be mixed into a SlickFunction returning an Actor. */ -case class FunctionStatusWithData[A](functionStatus: FunctionStatus, data: A) +trait OptionalActorSlickConverter { + + protected def slickConverter: GetResult[Option[Actor]] = { + def converter(r: PositionedResult): Option[Actor] = { + Some(Actor(r.<<, r.<<, r.<<)) + } + GetResult(converter) + } +} diff --git a/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionIntegrationTests.scala b/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionIntegrationTests.scala index 19b9b942..cc7e692d 100644 --- a/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionIntegrationTests.scala +++ b/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionIntegrationTests.scala @@ -42,8 +42,11 @@ class SlickMultipleResultFunctionIntegrationTests extends AnyFunSuite with Slick private val getActors = new GetActors()(Integration, new SlickPgEngine(db)) test("Retrieving actors from database") { - val expectedResultElem = Actor(49, "Pavel", "Marek") - val results = getActors(GetActorsQueryParameters(Some("Pavel"), Some("Marek"))) - assert(results.futureValue.contains(expectedResultElem)) + val expectedResultElem = Set( + Actor(51, "Fred", "Weasley"), + Actor(52, "George", "Weasley"), + ) + val results = getActors(GetActorsQueryParameters(lastName=Some("Weasley"), firstName=None)).futureValue + assert(results.toSet == expectedResultElem) } } diff --git a/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithAggStatusIntegrationTests.scala b/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithAggStatusIntegrationTests.scala new file mode 100644 index 00000000..55e4f06f --- /dev/null +++ b/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithAggStatusIntegrationTests.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.slick + +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import slick.jdbc.SQLActionBuilder +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.slick.FaDbPostgresProfile.api._ +import za.co.absa.fadb.slick.SlickFunction.SlickMultipleResultFunctionWithAggStatus +import za.co.absa.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.fadb.status.{FunctionStatus, Row} + +import scala.concurrent.ExecutionContext.Implicits.global + + +class SlickMultipleResultFunctionWithAggStatusIntegrationTests extends AnyFunSuite with SlickTest with ScalaFutures { + + class GetActorsByLastname(implicit override val schema: DBSchema, val dbEngine: SlickPgEngine) + extends SlickMultipleResultFunctionWithAggStatus[GetActorsByLastnameQueryParameters, Option[Actor]] + with StandardStatusHandling + with ByFirstErrorStatusAggregator + with OptionalActorSlickConverter { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") + + override def sql(values: GetActorsByLastnameQueryParameters): SQLActionBuilder = { + sql"""SELECT #$selectEntry FROM #$functionName(${values.lastName},${values.firstName}) #$alias;""" + } + } + + private val getActorsByLastname = new GetActorsByLastname()(Integration, new SlickPgEngine(db)) + + test("Retrieving actors from database") { + val expectedResultElem = Set( + Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(51, "Fred", "Weasley"))), + Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(52, "George", "Weasley"))) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).futureValue + val actualData = results match { + case Left(_) => fail("should not be left") + case Right(dataWithStatuses) => dataWithStatuses + } + assert(actualData.length == 2) + assert(actualData.toSet == expectedResultElem) + + } +} diff --git a/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusIntegrationTests.scala b/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusIntegrationTests.scala new file mode 100644 index 00000000..364a71e4 --- /dev/null +++ b/slick/src/test/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusIntegrationTests.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.slick + +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import slick.jdbc.SQLActionBuilder +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.slick.FaDbPostgresProfile.api._ +import za.co.absa.fadb.slick.SlickFunction.SlickMultipleResultFunctionWithStatus +import za.co.absa.fadb.status.{FunctionStatus, Row} +import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling + +import scala.concurrent.ExecutionContext.Implicits.global + + +class SlickMultipleResultFunctionWithStatusIntegrationTests extends AnyFunSuite with SlickTest with ScalaFutures { + + class GetActorsByLastname(implicit override val schema: DBSchema, val dbEngine: SlickPgEngine) + extends SlickMultipleResultFunctionWithStatus[GetActorsByLastnameQueryParameters, Option[Actor]] + with StandardStatusHandling + with OptionalActorSlickConverter { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") + + override def sql(values: GetActorsByLastnameQueryParameters): SQLActionBuilder = { + sql"""SELECT #$selectEntry FROM #$functionName(${values.lastName},${values.firstName}) #$alias;""" + } + } + + private val getActorsByLastname = new GetActorsByLastname()(Integration, new SlickPgEngine(db)) + + test("Retrieving actors from database") { + val expectedResultElem = Set( + Right(Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(51, "Fred", "Weasley")))), + Right(Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(52, "George", "Weasley")))) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).futureValue + assert(results.toSet == expectedResultElem) + } +} diff --git a/slick/src/test/scala/za/co/absa/fadb/slick/SlickTest.scala b/slick/src/test/scala/za/co/absa/fadb/slick/SlickTest.scala index 669e97ed..ce7fa2ac 100644 --- a/slick/src/test/scala/za/co/absa/fadb/slick/SlickTest.scala +++ b/slick/src/test/scala/za/co/absa/fadb/slick/SlickTest.scala @@ -20,8 +20,10 @@ import slick.jdbc.JdbcBackend.Database import za.co.absa.fadb.DBSchema trait SlickTest { - case class CreateActorRequestBody(firstName: String, lastName: String) case class GetActorsQueryParameters(firstName: Option[String], lastName: Option[String]) + case class GetActorsByLastnameQueryParameters(lastName: String, firstName: Option[String] = None) + + case class CreateActorRequestBody(firstName: String, lastName: String) import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ object Integration extends DBSchema