Skip to content

Commit

Permalink
Merge branch 'master' into feature/126-bring-worklow-actions-up-to-date
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki authored Jun 25, 2024
2 parents 16e3ebd + 2e0631a commit 01e0096
Show file tree
Hide file tree
Showing 37 changed files with 1,360 additions and 177 deletions.
43 changes: 34 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ within the application.**

---

Currently, the library is developed with Postgres as the target DB. But the approach is applicable to any DB supporting stored procedure/functions – Oracle, MS-SQL, ...
Currently, the library is developed with Postgres as the target DB. But the approach is applicable to any DB
supporting stored procedure/functions – Oracle, MS-SQL, ...


## Usage

#### Sbt

Import one of the two available module at the moment. Slick module works with Scala Futures. Doobie module works with any effect type (typically IO or ZIO) provided cats effect's Async instance is available.
Import one of the two available module at the moment. Slick module works with Scala Futures. Doobie module works with
any effect type (typically IO or ZIO) provided cats effect's Async instance is available.

```scala
libraryDependencies *= "za.co.absa.fa-db" %% "slick" % "X.Y.Z"
Expand Down Expand Up @@ -111,16 +113,27 @@ Text about status codes returned from the database function can be found [here](

## Slick module

As the name suggests it runs on [Slick library](https://github.com/slick/slick) and also brings in the [Slickpg library](https://github.com/tminglei/slick-pg/) for extended Postgres type support.
As the name suggests it runs on [Slick library](https://github.com/slick/slick) and also brings in the [Slickpg library](https://github.com/tminglei/slick-pg/) for extended
Postgres type support.

It brings:

* `class SlickPgEngine` - implementation of _Core_'s `DBEngine` executing the queries via Slick
* `class SlickSingleResultFunction` - abstract class for DB functions returning single result
* `class SlickMultipleResultFunction` - abstract class for DB functions returning sequence of results
* `class SlickOptionalResultFunction` - abstract class for DB functions returning optional result
* `class SlickSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box)
* `trait FaDbPostgresProfile` - to bring support for Postgres and its extended data types in one class (except JSON, as there are multiple implementations for this data type in _Slick-Pg_)
* `class SlickSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an
implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box)
* `class SlickMultipleResultFunctionWithStatus` - as `SlickSingleResultFunctionWithStatus` but for multiple record
retrieval
* `class SlickMultipleResultFunctionWithAggStatus` - as `SlickMultipleResultFunctionWithStatus` but it aggregates
the statuses into a single record. It requires an implementation of `StatusAggregator` to be mixed-in
(`ByFirstErrorStatusAggregator`, `ByFirstRowStatusAggregator`, and `ByMajorityErrorsStatusAggregator` available
out of the box)
* `class SlickOptionalResultFunctionWithStatus` - as `SlickSingleResultFunctionWithStatus` but the returning record
is optional
* `trait FaDbPostgresProfile` - to bring support for Postgres and its extended data types in one class
(except JSON, as there are multiple implementations for this data type in _Slick-Pg_)
* `object FaDbPostgresProfile` - instance of the above trait for direct use

#### Known issues
Expand All @@ -143,17 +156,29 @@ val macAddr: Option[MacAddrString] = pr.nextMacAddrOption

## Doobie module

As the name suggests it runs on [Doobie library](https://tpolecat.github.io/doobie/). The main benefit of the module is that it allows to use any effect type (typically IO or ZIO) therefore is more suitable for functional programming. It also brings in the [Doobie-Postgres library](https://tpolecat.github.io/doobie/docs/14-PostgreSQL.html) for extended Postgres type support.
As the name suggests it runs on [Doobie library](https://tpolecat.github.io/doobie/). The main benefit of the module is that it allows to use any
effect type (typically IO or ZIO) therefore is more suitable for functional programming.
It also brings in the [Doobie-Postgres library](https://tpolecat.github.io/doobie/docs/14-PostgreSQL.html) for extended Postgres type support.

It brings:

* `class DoobieEngine` - implementation of _Core_'s `DBEngine` executing the queries via Doobie. The class is type parameterized with the effect type.
* `class DoobieSingleResultFunction` - abstract class for DB functions returning single result
* `class DoobieMultipleResultFunction` - abstract class for DB functions returning sequence of results
* `class DoobieOptionalResultFunction` - abstract class for DB functions returning optional result
* `class DoobieSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box)

Since Doobie also interoperates with ZIO, there is an example of how a database connection can be properly established within a ZIO application. Please see [this file](doobie/zio-setup.md) for more details.
* `class DoobieSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires
an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box)
* `class DoobieMultipleResultFunctionWithStatus` - as `DoobieSingleResultFunctionWithStatus` but for multiple record
retrieval
* `class DoobieMultipleResultFunctionWithAggStatus` - as `DoobieMultipleResultFunctionWithStatus` but it aggregates
the statuses into a single record. It requires an implementation of `StatusAggregator` to be mixed-in
(`ByFirstErrorStatusAggregator`, `ByFirstRowStatusAggregator`, and `ByMajorityErrorsStatusAggregator` available
out of the box)
* `class DoobieOptionalResultFunctionWithStatus` - as `DoobieSingleResultFunctionWithStatus` but the returning record
is optional

Since Doobie also interoperates with ZIO, there is an example of how a database connection can be properly established
within a ZIO application. Please see [this file](doobie/zio-setup.md) for more details.

## Testing

Expand Down
41 changes: 23 additions & 18 deletions core/src/main/scala/za/co/absa/fadb/DBEngine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.fadb

import cats.Monad
import cats.implicits.toFunctorOps
import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.fadb.status.FailedOrRow

import scala.language.higherKinds

Expand All @@ -38,47 +38,52 @@ abstract class DBEngine[F[_]: Monad] {

/**
* The actual query executioner of the queries of the engine
*
* Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries.
*
* @param query - the query to execute
* @tparam R - return type of the query
* @return - sequence of the results of database query
*/
protected def run[R](query: QueryType[R]): F[Seq[R]]

/**
* The actual query executioner of the queries of the engine with status
* @param query - the query to execute
* @tparam R - return type of the query
* @return - result of database query with status
*/
def runWithStatus[R](query: QueryWithStatusType[R]): F[Either[StatusException, R]]
protected def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]]

/**
* Public method to execute when query is expected to return multiple results
*
* Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries.
*
* @param query - the query to execute
* @tparam R - return type of the query
* @return - sequence of the results of database query
*/
def fetchAll[R](query: QueryType[R]): F[Seq[R]] = {
run(query)
}
def fetchAll[R](query: QueryType[R]): F[Seq[R]] = run(query)
def fetchAllWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]] =
runWithStatus(query)

/**
* Public method to execute when query is expected to return exactly one row
*
* Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries.
*
* @param query - the query to execute
* @tparam R - return type of the query
* @return - sequence of the results of database query
*/
def fetchHead[R](query: QueryType[R]): F[R] = {
run(query).map(_.head)
}
def fetchHead[R](query: QueryType[R]): F[R] = run(query).map(_.head)
def fetchHeadWithStatus[R](query: QueryWithStatusType[R]): F[FailedOrRow[R]] =
runWithStatus(query).map(_.head)

/**
* Public method to execute when query is expected to return one or no results
*
* Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries.
*
* @param query - the query to execute
* @tparam R - return type of the query
* @return - sequence of the results of database query
*/
def fetchHeadOption[R](query: QueryType[R]): F[Option[R]] = {
run(query).map(_.headOption)
}
def fetchHeadOption[R](query: QueryType[R]): F[Option[R]] = run(query).map(_.headOption)
def fetchHeadOptionWithStatus[R](query: QueryWithStatusType[R]): F[Option[FailedOrRow[R]]] =
runWithStatus(query).map(_.headOption)
}
143 changes: 134 additions & 9 deletions core/src/main/scala/za/co/absa/fadb/DBFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package za.co.absa.fadb

import cats.MonadError
import cats.implicits.toFlatMapOps
import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.fadb.status.aggregation.StatusAggregator
import za.co.absa.fadb.status.handling.StatusHandling
import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow, Row}

import scala.language.higherKinds

Expand Down Expand Up @@ -105,13 +106,28 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv
def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName))

/**
* Executes the database function and returns multiple results.
* @param values The values to pass over to the database function.
* @return A sequence of results from the database function.
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Either[StatusException, R]] = {
query(values).flatMap(q => dBEngine.runWithStatus(q))
}
* Executes the database function and returns multiple results.
* @param values - The values to pass over to the database function.
* @return - A sequence of results from the database function.
*/
protected def multipleResults(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[FailedOrRow[R]]] =
query(values).flatMap(q => dBEngine.fetchAllWithStatus(q))

/**
* Executes the database function and returns a single result.
* @param values - The values to pass over to the database function.
* @return - A single result from the database function.
*/
protected def singleResult(values: I)(implicit me: MonadError[F, Throwable]): F[FailedOrRow[R]] =
query(values).flatMap(q => dBEngine.fetchHeadWithStatus(q))

/**
* Executes the database function and returns an optional result.
* @param values - The values to pass over to the database function.
* @return - An optional result from the database function.
*/
protected def optionalResult(values: I)(implicit me: MonadError[F, Throwable]): F[Option[FailedOrRow[R]]] =
query(values).flatMap(q => dBEngine.fetchHeadOptionWithStatus(q))

/**
* The fields to select from the database function call
Expand All @@ -132,7 +148,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv
protected def query(values: I)(implicit me: MonadError[F, Throwable]): F[dBEngine.QueryWithStatusType[R]]

// To be provided by an implementation of QueryStatusHandling
override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A]
override def checkStatus[D](statusWithData: Row[D]): FailedOrRow[D]
}

object DBFunction {
Expand Down Expand Up @@ -206,4 +222,113 @@ object DBFunction {
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[R]] = optionalResult(values)
}

/**
* `DBMultipleResultFunctionWithStatus` is an abstract class that represents a database function returning
* multiple results with status information.
* It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a sequence of results.
*/
abstract class DBMultipleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)

// A constructor that allows specifying the function name as a string, but not as an option
def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName))

/**
* For easy and convenient execution of the DB function call
* @param values - the values to pass over to the database function
* @return - a sequence of values, each coming from a row returned from the DB function transformed to scala
* type `R` wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[FailedOrRow[R]]] = multipleResults(values)
}

/**
* `DBMultipleResultFunctionWithAggStatus` is an abstract class that represents a database function returning
* multiple results with status information.
* It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a sequence of results
*
* It's similar to `DBMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value.
* The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`.
*/
abstract class DBMultipleResultFunctionWithAggStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride)
with StatusAggregator {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)

// A constructor that allows specifying the function name as a string, but not as an option
def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName))

/**
* For easy and convenient execution of the DB function call
* @param values - the values to pass over to the database function
* @return - a sequence of values, each coming from a row returned from the DB function transformed to scala
* type `R` wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)
(implicit me: MonadError[F, Throwable]): F[FailedOrRows[R]] =
multipleResults(values).flatMap(data => me.pure(aggregate(data)))
}

/**
* `DBSingleResultFunctionWithStatus` is an abstract class that represents a database function returning
* a single result with status information.
* It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a single result.
*/
abstract class DBSingleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)

// A constructor that allows specifying the function name as a string, but not as an option
def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName))

/**
* For easy and convenient execution of the DB function call
* @param values - the values to pass over to the database function
* @return - the value returned from the DB function transformed to scala type `R`
* wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[FailedOrRow[R]] =
singleResult(values)
}

/**
* `DBOptionalResultFunctionWithStatus` is an abstract class that represents a database function returning
* an optional result with status information.
* It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return an optional result.
*/
abstract class DBOptionalResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)

// A constructor that allows specifying the function name as a string, but not as an option
def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName))

/**
* For easy and convenient execution of the DB function call
* @param values - the values to pass over to the database function
* @return - the value returned from the DB function transformed to scala type `R` if a row is returned,
* otherwise `None`, wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[FailedOrRow[R]]] =
optionalResult(values)
}

}
21 changes: 10 additions & 11 deletions core/src/main/scala/za/co/absa/fadb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.fadb

import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.fadb.status.{FailedOrRow, Row}

/**
* The basis for all query types of [[DBEngine]] implementations
Expand All @@ -26,32 +26,31 @@ trait Query[R]

/**
* The basis for all query types of [[DBEngine]] implementations with status
* @tparam A - the initial result type of the query
* @tparam B - the intermediate result type of the query
* @tparam R - the final return type of the query
* @tparam DS - the initial result type of the query (a row basically, having status-related columns as well)
* @tparam D - the intermediate result type of the query (a row without status columns, i.e. data only)
* @tparam R - the final return type of the query (final version of result, depending on the needs, might be the same as D)
*/
trait QueryWithStatus[A, B, R] {
trait QueryWithStatus[DS, D, R] {

/**
* Processes the status of the query and returns the status with data
* @param initialResult - the initial result of the query
* @return the status with data
* @return data with status
*/
def processStatus(initialResult: A): FunctionStatusWithData[B]
def processStatus(initialResult: DS): Row[D]

/**
* Converts the status with data to either a status exception or the data
* @param statusWithData - the status with data
* @return either a status exception or the data
*/
def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[B]): Either[StatusException, R]
def toStatusExceptionOrData(statusWithData: Row[D]): FailedOrRow[R]

/**
* Returns the result of the query or a status exception
* @param initialResult - the initial result of the query
* @return the result of the query or a status exception
*/
def getResultOrException(initialResult: A): Either[StatusException, R] = toStatusExceptionOrData(
processStatus(initialResult)
)
def getResultOrException(initialResult: DS): FailedOrRow[R] =
toStatusExceptionOrData(processStatus(initialResult))
}
Loading

0 comments on commit 01e0096

Please sign in to comment.