diff --git a/.github/workflows/jacoco_report.yml b/.github/workflows/jacoco_report.yml index e726ee807..ff49512e5 100644 --- a/.github/workflows/jacoco_report.yml +++ b/.github/workflows/jacoco_report.yml @@ -63,7 +63,7 @@ jobs: - name: Add coverage to PR (model) if: steps.jacocorun.outcome == 'success' id: jacoco-model - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -74,7 +74,7 @@ jobs: - name: Add coverage to PR (agent) if: steps.jacocorun.outcome == 'success' id: jacoco-agent - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/agent/target/spark3-jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -85,7 +85,7 @@ jobs: - name: Add coverage to PR (reader) if: steps.jacocorun.outcome == 'success' id: jacoco-reader - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/reader/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -96,7 +96,7 @@ jobs: - name: Add coverage to PR (server) if: steps.jacocorun.outcome == 'success' id: jacoco-server - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/server/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 565d582a6..ed2f466d0 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -13,10 +13,10 @@ * limitations under the License. */ --- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT) CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( - IN i_partitioning JSONB, - IN i_limit INT DEFAULT 5, + IN i_partitioning_id BIGINT, + IN i_checkpoints_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, IN i_checkpoint_name TEXT DEFAULT NULL, OUT status INTEGER, OUT status_text TEXT, @@ -28,86 +28,106 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( OUT measured_columns TEXT[], OUT measurement_value JSONB, OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT has_more BOOLEAN ) - RETURNS SETOF record AS -$$ - ------------------------------------------------------------------------------- +RETURNS SETOF record AS +------------------------------------------------------------------------------- -- --- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT) --- Retrieves all checkpoints (measures and their measurement details) related to a +-- Function: runs.get_partitioning_checkpoints(4) +-- Retrieves checkpoints (measures and their measurement details) related to a -- given partitioning (and checkpoint name, if specified). -- -- Parameters: -- i_partitioning - partitioning of requested checkpoints --- i_limit - (optional) maximum number of checkpoint's measurements to return --- if 0 specified, all data will be returned, i.e. no limit will be applied +-- i_checkpoints_limit - (optional) maximum number of checkpoints to return +-- i_offset - (optional) offset of the first checkpoint to return +-- i_checkpoint_name - (optional) name of the checkpoint + +-- Note: i_checkpoints_limit and i_offset are used for pagination purposes; +-- checkpoints are ordered by process_start_time in descending order +-- and then by id_checkpoint in ascending order -- -- Returns: --- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name -- status - Status code -- status_text - Status message -- id_checkpoint - ID of the checkpoint -- checkpoint_name - Name of the checkpoint -- author - Author of the checkpoint --- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent -- measure_name - Name of the measure -- measure_columns - Columns of the measure -- measurement_value - Value of the measurement -- checkpoint_start_time - Time of the checkpoint -- checkpoint_end_time - End time of the checkpoint computation +-- has_more - Flag indicating whether there are more checkpoints available -- -- Status codes: -- 11 - OK -- 41 - Partitioning not found -- ------------------------------------------------------------------------------- +$$ DECLARE - _fk_partitioning BIGINT; + _has_more BOOLEAN; BEGIN - _fk_partitioning = runs._get_id_partitioning(i_partitioning); - - IF _fk_partitioning IS NULL THEN + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN status := 41; status_text := 'Partitioning not found'; RETURN NEXT; RETURN; END IF; + IF i_checkpoints_limit IS NOT NULL THEN + SELECT count(*) > i_checkpoints_limit + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + LIMIT i_checkpoints_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + RETURN QUERY + WITH limited_checkpoints AS ( + SELECT C.id_checkpoint, + C.checkpoint_name, + C.created_by, + C.measured_by_atum_agent, + C.process_start_time, + C.process_end_time + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.id_checkpoint, C.process_start_time + LIMIT i_checkpoints_limit OFFSET i_offset + ) SELECT 11 AS status, 'Ok' AS status_text, - C.id_checkpoint, - C.checkpoint_name, - C.created_by AS author, - C.measured_by_atum_agent, + LC.id_checkpoint, + LC.checkpoint_name, + LC.created_by AS author, + LC.measured_by_atum_agent, md.measure_name, md.measured_columns, M.measurement_value, - C.process_start_time AS checkpoint_start_time, - C.process_end_time AS checkpoint_end_time + LC.process_start_time AS checkpoint_start_time, + LC.process_end_time AS checkpoint_end_time, + _has_more AS has_more FROM - runs.checkpoints C - JOIN - runs.measurements M ON C.id_checkpoint = M.fk_checkpoint - JOIN + limited_checkpoints LC + INNER JOIN + runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint + INNER JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition - WHERE - C.fk_partitioning = _fk_partitioning - AND - (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) ORDER BY - C.process_start_time, - C.id_checkpoint - LIMIT nullif(i_limit, 0); - + LC.id_checkpoint, LC.process_start_time; END; $$ - LANGUAGE plpgsql VOLATILE SECURITY DEFINER; -ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner; - -GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner; - +ALTER FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index 719391c3c..21d623126 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.atum.database.runs import za.co.absa.balta.DBTestSuite @@ -7,7 +23,7 @@ import za.co.absa.balta.classes.setter.CustomDBType import java.time.OffsetDateTime import java.util.UUID -class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ +class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { private val fncGetPartitioningCheckpoints = "runs.get_partitioning_checkpoints" @@ -31,37 +47,26 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ |""".stripMargin ) - private val partitioning2 = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["key1", "key3", "key2", "key4"], - | "keysToValues": { - | "key1": "valueX", - | "key2": "valueY", - | "key3": "valueZ", - | "key4": "valueA" - | } - |} - |""".stripMargin - ) - - private val i_limit = 10 + private val i_checkpoints_limit = 1 + private val i_offset = 0 private val i_checkpoint_name = "checkpoint_1" private val measurement1 = JsonBString("""1""".stripMargin) private val measurement2 = JsonBString("""2""".stripMargin) - private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]") + private val measured_columns1 = CustomDBType("""{"col1"}""", "TEXT[]") + private val measured_columns2 = CustomDBType("""{"col2"}""", "TEXT[]") - test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") { + private val id_measure_definition1: Long = 1 + private val id_measure_definition2: Long = 2 - val uuid = UUID.randomUUID - val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val id_measure_definition: Long = 1 + private val uuid1 = UUID.fromString("d56fa5e2-79af-4a08-8b0c-6f83ff12cb2c") + private val uuid2 = UUID.fromString("6e42d61e-5cfa-45c1-9d0d-e1f3120107da") + private val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + private val startTime2 = OffsetDateTime.parse("1993-08-03T10:00:00Z") + private val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + test("Returns expected results when there is two measurements for one checkpoint") { table("runs.partitionings").insert( add("partitioning", partitioning1) .add("created_by", "Daniel") @@ -71,77 +76,85 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .fieldValue("partitioning", partitioning1, "id_partitioning").get.get table("runs.checkpoints").insert( - add("id_checkpoint", uuid) + add("id_checkpoint", uuid1) .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_1") - .add("process_start_time", startTime) + .add("process_start_time", startTime1) .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) table("runs.measure_definitions").insert( - add("id_measure_definition", id_measure_definition) + add("id_measure_definition", id_measure_definition1) .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns1) + ) + + table("runs.measure_definitions").insert( + add("id_measure_definition", id_measure_definition2) + .add("fk_partitioning", fkPartitioning1) + .add("created_by", "Daniel") + .add("measure_name", "measure_2") + .add("measured_columns", measured_columns2) ) table("runs.measurements").insert( - add("fk_checkpoint", uuid) - .add("fk_measure_definition", id_measure_definition) + add("fk_checkpoint", uuid1) + .add("fk_measure_definition", id_measure_definition1) .add("measurement_value", measurement1) ) + table("runs.measurements").insert( + add("fk_checkpoint", uuid1) + .add("fk_measure_definition", id_measure_definition2) + .add("measurement_value", measurement2) + ) + function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", i_checkpoints_limit) + .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) - assert(results.getString("measure_name").contains("measure_1")) - assert(!queryResult.hasNext) - } + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) - function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) - .execute { queryResult => assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) + val result2 = queryResult.next() + assert(result2.getInt("status").contains(11)) + assert(result2.getString("status_text").contains("Ok")) + assert(result2.getUUID("id_checkpoint").contains(uuid1)) + assert(result2.getString("checkpoint_name").contains("checkpoint_1")) + assert(result2.getString("author").contains("Daniel")) + assert(result2.getBoolean("measured_by_atum_agent").contains(true)) + assert(result2.getString("measure_name").contains("measure_2")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result2.getJsonB("measurement_value").contains(measurement2)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result2.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) } } - test("Get partitioning checkpoints returns multiple checkpoints for partitioning with multiple checkpoints") { - - val uuid1 = UUID.randomUUID - val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime1 = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val uuid2 = UUID.randomUUID - val startTime2 = OffsetDateTime.parse("1995-05-15T12:00:00Z") - val endTime2 = OffsetDateTime.parse("2025-07-20T15:00:00Z") - - val id_measure_definition1: Long = 1 - val id_measure_definition2: Long = 2 - + test( + "Returns expected results when there is one measurement for each of two checkpoints and different filtration is applied" + ) { table("runs.partitionings").insert( add("partitioning", partitioning1) .add("created_by", "Daniel") @@ -155,7 +168,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_1") .add("process_start_time", startTime1) - .add("process_end_time", endTime1) + .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) @@ -165,7 +178,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_2") .add("process_start_time", startTime2) - .add("process_end_time", endTime2) + .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) @@ -175,7 +188,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns1) ) table("runs.measure_definitions").insert( @@ -183,7 +196,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_2") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns2) ) table("runs.measurements").insert( @@ -199,53 +212,123 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ ) function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", i_offset) .execute { queryResult => assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid2)) + assert(result1.getString("checkpoint_name").contains("checkpoint_2")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_2")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result1.getJsonB("measurement_value").contains(measurement2)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) + + val result2 = queryResult.next() + assert(result2.getInt("status").contains(11)) + assert(result2.getString("status_text").contains("Ok")) + assert(result2.getUUID("id_checkpoint").contains(uuid1)) + assert(result2.getString("checkpoint_name").contains("checkpoint_1")) + assert(result2.getString("author").contains("Daniel")) + assert(result2.getBoolean("measured_by_atum_agent").contains(true)) + assert(result2.getString("measure_name").contains("measure_1")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result2.getJsonB("measurement_value").contains(measurement1)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result2.getBoolean("has_more").contains(false)) + assert(!queryResult.hasNext) + } - // Check the first result - val results1 = queryResult.next() - assert(results1.getInt("status").contains(11)) - assert(results1.getString("status_text").contains("Ok")) - assert(results1.getString("checkpoint_name").contains("checkpoint_1")) - assert(results1.getUUID("id_checkpoint").contains(uuid1)) - assert(results1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) - assert(results1.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) - assert(results1.getJsonB("measurement_value").contains(measurement1)) - assert(results1.getString("measure_name").contains("measure_1")) - - // Check the second result + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", i_offset) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => assert(queryResult.hasNext) - val results2 = queryResult.next() - assert(results2.getInt("status").contains(11)) - assert(results2.getString("status_text").contains("Ok")) - assert(results2.getString("checkpoint_name").contains("checkpoint_2")) - assert(results2.getUUID("id_checkpoint").contains(uuid2)) - assert(results2.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) - assert(results2.getOffsetDateTime("checkpoint_end_time").contains(endTime2)) - assert(results2.getJsonB("measurement_value").contains(measurement2)) - assert(results2.getString("measure_name").contains("measure_2")) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) } - } - - test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") { - table("runs.partitionings").insert( - add("partitioning", partitioning2) - .add("created_by", "Daniel") - ) + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", 1) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(!queryResult.hasNext) + } function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning2) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", 0L) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", 1) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => + assert(queryResult.hasNext) + val result = queryResult.next() + assert(result.getInt("status").contains(41)) + assert(result.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) + } + + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 1) + .setParam("i_offset", i_offset) + .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid2)) + assert(result1.getString("checkpoint_name").contains("checkpoint_2")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_2")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result1.getJsonB("measurement_value").contains(measurement2)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(true)) assert(!queryResult.hasNext) } + } + test("Returns expected status when partitioning not found"){ + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", 1) + .setParam("i_checkpoints_limit", i_checkpoints_limit) + .setParam("i_offset", i_offset) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(41)) + assert(result1.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) + } } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 5e88951ae..fd0e9720f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.http.ApiPaths +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse._ import za.co.absa.atum.server.model._ import zio._ @@ -54,6 +55,17 @@ trait BaseController { effect.map(MultiSuccessResponse(_)) } + protected def mapToPaginatedResponse[A]( + limit: Int, + offset: Long, + effect: IO[ErrorResponse, PaginatedResult[A]] + ): IO[ErrorResponse, PaginatedResponse[A]] = { + effect.map { + case ResultHasMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = true)) + case ResultNoMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = false)) + } + } + // Root-anchored URL path // https://stackoverflow.com/questions/2005079/absolute-vs-relative-urls/78439286#78439286 protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index e72ee1ae3..84280e465 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -39,4 +39,11 @@ trait CheckpointController { checkpointId: UUID ): IO[ErrorResponse, SingleSuccessResponse[CheckpointV2DTO]] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] = None, + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index 9cbd50391..c2221eec8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -19,8 +19,8 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService -import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import java.util.UUID @@ -63,6 +63,21 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che ) ) } + + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] = None + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[CheckpointV2DTO], PaginatedResult[CheckpointV2DTO]]( + checkpointService.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + ) + ) + } } object CheckpointControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index ac8d057f1..db212a249 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -41,13 +41,8 @@ trait PartitioningController { additionalDataPatchDTO: AdditionalDataPatchDTO ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] - def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] - def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] - def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index cdb457e8c..6bac1662a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -50,16 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) atumContextDTOEffect } - override def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { - mapToMultiSuccessResponse( - serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( - partitioningService.getPartitioningCheckpoints(checkpointQueryDTO) - ) - ) - } - override def getPartitioningAdditionalDataV2( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala index 4901b867a..a76ed0ad1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala @@ -17,31 +17,31 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.implicits.toSqlInterpolator -import za.co.absa.atum.model.dto.CheckpointQueryDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs -import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} +import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB} import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import zio._ -import io.circe.syntax.EncoderOps import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ -import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} -import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstRowStatusAggregator import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values => + extends DoobieMultipleResultFunctionWithAggStatus[GetPartitioningCheckpointsArgs, Option[CheckpointItemFromDB], Task](args => Seq( - fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", - fr"${values.limit}", - fr"${values.checkpointName}" + fr"${args.partitioningId}", + fr"${args.limit}", + fr"${args.offset}", + fr"${args.checkpointName}" ) ) with StandardStatusHandling - with ByFirstErrorStatusAggregator { + with ByFirstRowStatusAggregator { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", @@ -52,11 +52,19 @@ class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngi "measured_columns", "measurement_value", "checkpoint_start_time", - "checkpoint_end_time" + "checkpoint_end_time", + "has_more" ) } object GetPartitioningCheckpoints { + case class GetPartitioningCheckpointsArgs( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ) + val layer: URLayer[PostgresDatabaseProvider, GetPartitioningCheckpoints] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index d8c3d729f..46f66d095 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -23,8 +23,8 @@ import sttp.tapir.json.circe.jsonBody import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} -import sttp.tapir.{PublicEndpoint, endpoint} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} +import sttp.tapir.{PublicEndpoint, Validator, endpoint} import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} import java.util.UUID @@ -108,12 +108,17 @@ trait Endpoints extends BaseEndpoints { } protected val getPartitioningCheckpointsEndpointV2 - : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { + : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[ + CheckpointV2DTO + ], Any] = { apiV2.get - .in(GetPartitioningCheckpoints) - .in(jsonBody[CheckpointQueryDTO]) + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Checkpoints) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) - .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) + .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) } protected val getFlowCheckpointsEndpointV2 diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index ea8d7d87b..452ff87ea 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -29,7 +29,7 @@ import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -80,7 +80,16 @@ trait Routes extends Endpoints with ServerOptions { CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointId) } ), - createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), + createServerEndpoint[ + (Long, Option[Int], Option[Long], Option[String]), + ErrorResponse, + PaginatedResponse[CheckpointV2DTO] + ]( + getPartitioningCheckpointsEndpointV2, + { case (partitioningId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + } + ), createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index 771b68efa..5f48c96b3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.PaginatedResult import zio._ import zio.macros.accessible @@ -28,4 +29,10 @@ trait CheckpointRepository { def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] def writeCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[DatabaseError, Unit] def getCheckpointV2(partitioningId: Long, checkpointId: UUID): IO[DatabaseError, CheckpointV2DTO] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index f4ee58629..9f7608232 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -19,11 +19,13 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs -import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV2} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError -import za.co.absa.atum.server.model.CheckpointItemFromDB +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult} import zio._ import zio.interop.catz.asyncInstance @@ -32,7 +34,8 @@ import java.util.UUID class CheckpointRepositoryImpl( writeCheckpointFn: WriteCheckpoint, writeCheckpointV2Fn: WriteCheckpointV2, - getCheckpointV2Fn: GetPartitioningCheckpointV2 + getCheckpointV2Fn: GetPartitioningCheckpointV2, + getPartitioningCheckpoints: GetPartitioningCheckpoints ) extends CheckpointRepository with BaseRepository { @@ -60,14 +63,47 @@ class CheckpointRepositoryImpl( } } + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getPartitioningCheckpoints(GetPartitioningCheckpointsArgs(partitioningId, limit, offset, checkpointName)), + "getPartitioningCheckpoints" + ) + .map(_.flatten) + .flatMap { checkpointItems => + ZIO + .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + checkpoints => + if (checkpointItems.nonEmpty && checkpointItems.head.hasMore) ResultHasMore(checkpoints) + else ResultNoMore(checkpoints) + ) + } + } + } object CheckpointRepositoryImpl { - val layer: URLayer[WriteCheckpoint with WriteCheckpointV2 with GetPartitioningCheckpointV2, CheckpointRepository] = ZLayer { - for { - writeCheckpoint <- ZIO.service[WriteCheckpoint] - writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] - getCheckpointV2 <- ZIO.service[GetPartitioningCheckpointV2] - } yield new CheckpointRepositoryImpl(writeCheckpoint, writeCheckpointV2, getCheckpointV2) - } + val layer: URLayer[ + WriteCheckpoint with WriteCheckpointV2 with GetPartitioningCheckpointV2 with GetPartitioningCheckpoints, + CheckpointRepository + ] = + ZLayer { + for { + writeCheckpoint <- ZIO.service[WriteCheckpoint] + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + getCheckpointV2 <- ZIO.service[GetPartitioningCheckpointV2] + getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] + } yield new CheckpointRepositoryImpl( + writeCheckpoint, + writeCheckpointV2, + getCheckpointV2, + getPartitioningCheckpoints + ) + } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index b35f908da..88ca0dbbb 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,7 +18,6 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB import zio.IO import zio.macros.accessible @@ -43,8 +42,6 @@ trait PartitioningRepository { additionalData: AdditionalDataPatchDTO ): IO[DatabaseError, AdditionalDataDTO] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] - def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 83fa2d92f..c6363589b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -31,7 +31,6 @@ class PartitioningRepositoryImpl( getPartitioningMeasuresFn: GetPartitioningMeasures, getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, - getPartitioningCheckpointsFn: GetPartitioningCheckpoints, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2, getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById @@ -82,15 +81,6 @@ class PartitioningRepositoryImpl( ).map(_.map { case AdditionalDataFromDB(adName, adValue) => adName.get -> adValue }.toMap) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbMultipleResultCallWithAggregatedStatus( - getPartitioningCheckpointsFn(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[DatabaseError, AdditionalDataDTO] = { dbMultipleResultCallWithAggregatedStatus( getPartitioningAdditionalDataV2Fn(partitioningId), @@ -127,7 +117,6 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasures with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData - with GetPartitioningCheckpoints with GetPartitioningAdditionalDataV2 with GetPartitioningById with GetPartitioningMeasuresById, @@ -139,7 +128,6 @@ object PartitioningRepositoryImpl { getPartitioningMeasures <- ZIO.service[GetPartitioningMeasures] getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData] - getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] @@ -149,7 +137,6 @@ object PartitioningRepositoryImpl { getPartitioningMeasures, getPartitioningAdditionalData, createOrUpdateAdditionalData, - getPartitioningCheckpoints, getPartitioningById, getPartitioningAdditionalDataV2, getPartitioningMeasuresV2 diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index 62cf7a231..d45c09e59 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -28,4 +29,10 @@ trait CheckpointService { def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] def getCheckpointV2(partitioningId: Long, checkpointId: UUID): IO[ServiceError, CheckpointV2DTO] def saveCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[ServiceError, Unit] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index 1da3a19f8..c0844f063 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.CheckpointRepository +import za.co.absa.atum.server.model.PaginatedResult import zio._ import java.util.UUID @@ -46,6 +47,17 @@ class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends ) } + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] = { + repositoryCall( + checkpointRepository.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName), + "getPartitioningCheckpoints" + ) + } } object CheckpointServiceImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index b47bd9652..7779a710d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -40,8 +40,6 @@ trait PartitioningService { additionalData: AdditionalDataPatchDTO ): IO[ServiceError, AdditionalDataDTO] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] - def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 6fcd7c966..2525568f9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -18,9 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.atum.server.model.CheckpointFromDB import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -59,23 +57,6 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ServiceError, Seq[CheckpointDTO]] = { - for { - checkpointsFromDB <- repositoryCall( - partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => - ZIO - .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => GeneralServiceError(error.getMessage)) - } - } yield checkpointDTOs - - } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] = { repositoryCall( partitioningRepository.getPartitioningAdditionalDataV2(partitioningId), diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala index 035e55fc9..3d8ca67c9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala @@ -31,7 +31,8 @@ case class CheckpointItemFromDB( measuredColumns: Seq[String], measurementValue: Json, // JSON representation of `MeasurementDTO` checkpointStartTime: ZonedDateTime, - checkpointEndTime: Option[ZonedDateTime] + checkpointEndTime: Option[ZonedDateTime], + hasMore: Boolean ) object CheckpointItemFromDB { @@ -71,4 +72,22 @@ object CheckpointItemFromDB { } } + def groupAndConvertItemsToCheckpointV2DTOs( + checkpointItems: Seq[CheckpointItemFromDB] + ): Either[DecodingFailure, Seq[CheckpointV2DTO]] = { + val groupedItems = checkpointItems.groupBy(_.idCheckpoint) + val orderedIds = checkpointItems.map(_.idCheckpoint).distinct + + val result = orderedIds.map { id => + CheckpointItemFromDB.fromItemsToCheckpointV2DTO(groupedItems(id)) + } + + val errors = result.collect { case Left(err) => err } + if (errors.nonEmpty) { + Left(errors.head) + } else { + Right(result.collect { case Right(dto) => dto }) + } + } + } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 61d0df6b0..aca4c0784 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -353,7 +353,8 @@ trait TestData { measuredColumns = checkpointV2DTO1.measurements.head.measure.measuredColumns, measurementValue = checkpointV2DTO1.measurements.head.result.asJson, checkpointStartTime = checkpointV2DTO1.processStartTime, - checkpointEndTime = checkpointV2DTO1.processEndTime + checkpointEndTime = checkpointV2DTO1.processEndTime, + hasMore = true ) protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala index 52d13e6c3..b4e8d33fa 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala @@ -22,8 +22,14 @@ import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.CheckpointService +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse -import za.co.absa.atum.server.model.{ConflictErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse} +import za.co.absa.atum.server.model.{ + ConflictErrorResponse, + InternalServerErrorResponse, + NotFoundErrorResponse, + Pagination +} import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -38,23 +44,31 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { when(checkpointServiceMock.saveCheckpoint(checkpointDTO3)) .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) - private val partitioningId = 1L + private val partitioningId1 = 1L + private val partitioningId2 = 2L - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO1)).thenReturn(ZIO.unit) - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO2)) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO1)).thenReturn(ZIO.unit) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO2)) .thenReturn(ZIO.fail(GeneralServiceError("error in data"))) - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO3)) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO3)) .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) when(checkpointServiceMock.saveCheckpointV2(0L, checkpointV2DTO3)) .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO1.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO1.id)) .thenReturn(ZIO.succeed(checkpointV2DTO1)) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO2.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO2.id)) .thenReturn(ZIO.fail(NotFoundServiceError("not found"))) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO3.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO3.id)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(checkpointServiceMock.getPartitioningCheckpoints(partitioningId1, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(checkpointServiceMock.getPartitioningCheckpoints(partitioningId2, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointV2DTO1)))) + when(checkpointServiceMock.getPartitioningCheckpoints(0L, Some(10), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) + private val checkpointServiceMockLayer = ZLayer.succeed(checkpointServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -80,11 +94,11 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { suite("PostCheckpointV2Suite")( test("Returns expected CheckpointDTO") { for { - result <- CheckpointController.postCheckpointV2(partitioningId, checkpointV2DTO1) + result <- CheckpointController.postCheckpointV2(partitioningId1, checkpointV2DTO1) } yield assertTrue( result._1.isInstanceOf[SingleSuccessResponse[CheckpointV2DTO]] && result._1.data == checkpointV2DTO1 - && result._2 == s"/api/v2/partitionings/$partitioningId/checkpoints/${checkpointV2DTO1.id}" + && result._2 == s"/api/v2/partitionings/$partitioningId1/checkpoints/${checkpointV2DTO1.id}" ) }, test("Returns expected InternalServerErrorResponse") { @@ -106,19 +120,48 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { suite("GetPartitioningCheckpointV2Suite")( test("Returns expected CheckpointDTO") { for { - result <- CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO1.id) + result <- CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO1.id) } yield assertTrue(result.data == checkpointV2DTO1) }, test("Returns expected NotFoundErrorResponse") { - assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO2.id).exit)( + assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO2.id).exit)( failsWithA[NotFoundErrorResponse] ) }, test("Returns expected InternalServerErrorResponse") { - assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO3.id).exit)( + assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO3.id).exit)( failsWithA[InternalServerErrorResponse] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is more data available") { + for { + result <- CheckpointController.getPartitioningCheckpoints( + partitioningId1, + limit = Some(10), + offset = Some(0) + ) + } yield assertTrue( + result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(10, 0, hasMore = true) + ) + }, + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is no more data available") { + for { + result <- CheckpointController.getPartitioningCheckpoints( + partitioningId2, + limit = Some(10), + offset = Some(0) + ) + } yield assertTrue( + result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(10, 0, hasMore = false) + ) + }, + test("Returns expected NotFoundErrorResponse when service returns NotFoundServiceError") { + assertZIO(CheckpointController.getPartitioningCheckpoints(0L, Some(10), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } ) ).provide( CheckpointControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 2473bf490..0e249faee 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -17,7 +17,6 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError.{ConflictServiceError, GeneralServiceError, NotFoundServiceError} import za.co.absa.atum.server.api.service.PartitioningService @@ -55,13 +54,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.patchAdditionalData(2L, additionalDataPatchDTO1)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.succeed(Seq(checkpointDTO1, checkpointDTO2))) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.succeed(Seq.empty)) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO3)) - .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(partitioningServiceMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningServiceMock.getPartitioningAdditionalDataV2(2L)) @@ -132,23 +124,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Seq[MeasureDTO]") { - for { - result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO1) - } yield assertTrue(result.data == Seq(checkpointDTO1, checkpointDTO2)) - }, - test("Returns expected empty sequence") { - for { - result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO2) - } yield assertTrue(result.data == Seq.empty[CheckpointDTO]) - }, - test("Returns expected InternalServerErrorResponse") { - assertZIO(PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO3).exit)( - failsWithA[InternalServerErrorResponse] - ) - } - ), suite("GetPartitioningSuite")( test("Returns expected PartitioningWithIdDTO") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala index ab38e39ca..0c8e12cf9 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala @@ -17,9 +17,9 @@ package za.co.absa.atum.server.api.database.runs.functions import za.co.absa.atum.server.ConfigProviderTest -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.db.fadb.exceptions.DataNotFoundException import za.co.absa.db.fadb.status.FunctionStatus import zio.interop.catz.asyncInstance @@ -29,23 +29,11 @@ import zio.test._ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { - - val partitioningDTO1: PartitioningDTO = Seq( - PartitionDTO("stringA", "stringA"), - PartitionDTO("stringB", "stringB") - ) - suite("GetPartitioningCheckpointsIntegrationTests")( - test("Returns expected sequence of Checkpoints with existing partitioning") { - val partitioningQueryDTO: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO1, - limit = Some(10), - checkpointName = Some("checkpointName") - ) - + test("Returns expected sequence of Checkpoints with non-existing partitioning id") { for { getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] - result <- getPartitioningCheckpoints(partitioningQueryDTO) + result <- getPartitioningCheckpoints(GetPartitioningCheckpointsArgs(0L, None, None, None)) } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala new file mode 100644 index 000000000..dd9fc89c7 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.{UriContext, basicRequest} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.CheckpointController +import za.co.absa.atum.server.model.{NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.{Scope, ZIO, ZLayer} +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} + +import java.util.UUID + +object GetPartitioningCheckpointsEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val checkpointControllerMock = mock(classOf[CheckpointController]) + + private val uuid = UUID.randomUUID() + + when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(10, 0, hasMore = true), uuid))) + when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(20), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid))) + when(checkpointControllerMock.getPartitioningCheckpoints(2L, Some(10), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundErrorResponse("partitioning not found"))) + + private val checkpointControllerMockLayer = ZLayer.succeed(checkpointControllerMock) + + private val getPartitioningCheckpointServerEndpointV2 = getPartitioningCheckpointsEndpointV2 + .zServerLogic({ + case (partitioningId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[CheckpointController])) + .whenServerEndpoint(getPartitioningCheckpointServerEndpointV2) + .thenRunLogic() + .backend() + + suite("GetPartitioningCheckpointsEndpointSuite")( + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with more data available") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=10&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(10, 0, hasMore = true), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with no more data available") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=20&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns expected 404 when checkpoint data for a given ID doesn't exist") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/2/checkpoints?limit=10&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns expected 400 when limit is out of range") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=1001&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + }, + test("Returns expected 400 when offset is negative") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=10&offset=-1") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + } + ) + }.provide( + checkpointControllerMockLayer + ) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index 797c4d82b..9f5568979 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -17,19 +17,21 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException} import za.co.absa.db.fadb.status.FunctionStatus import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ - import za.co.absa.db.fadb.status.Row object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -37,6 +39,7 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) private val getCheckpointMockV2: GetPartitioningCheckpointV2 = mock(classOf[GetPartitioningCheckpointV2]) private val writeCheckpointV2Mock: WriteCheckpointV2 = mock(classOf[WriteCheckpointV2]) + private val getPartitioningCheckpointsMock: GetPartitioningCheckpoints = mock(classOf[GetPartitioningCheckpoints]) when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(writeCheckpointMock.apply(checkpointDTO2)) @@ -61,9 +64,17 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { when(getCheckpointMockV2.apply(GetPartitioningCheckpointV2Args(partitioningId, checkpointV2DTO3.id))) .thenReturn(ZIO.fail(new Exception("boom!"))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(0L, None, None, None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "ok"), Some(checkpointItemFromDB1))))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(1L, None, None, None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "ok"), None)))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(3L, None, None, None))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + private val writeCheckpointMockLayer = ZLayer.succeed(writeCheckpointMock) private val getCheckpointV2MockLayer = ZLayer.succeed(getCheckpointMockV2) private val writeCheckpointV2MockLayer = ZLayer.succeed(writeCheckpointV2Mock) + private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -121,12 +132,34 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralDatabaseError] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Seq") { + for { + result <- CheckpointRepository.getPartitioningCheckpoints(0L, None, None, None) + } yield assertTrue( + result.isInstanceOf[ResultHasMore[CheckpointV2DTO]] && result.data == Seq(checkpointV2DTO1) + ) + }, + test("Returns expected Seq.empty") { + for { + result <- CheckpointRepository.getPartitioningCheckpoints(1L, None, None, None) + } yield assertTrue( + result.isInstanceOf[ResultNoMore[CheckpointV2DTO]] && result.data == Seq.empty[CheckpointV2DTO] + ) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(CheckpointRepository.getPartitioningCheckpoints(3L, None, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) + } ) ).provide( CheckpointRepositoryImpl.layer, writeCheckpointMockLayer, writeCheckpointV2MockLayer, - getCheckpointV2MockLayer + getCheckpointV2MockLayer, + getPartitioningCheckpointsMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 8f3f3dc49..a56dd9604 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -100,16 +100,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) - // Get Partitioning Checkpoints Mocks - private val getPartitioningCheckpointsMock = mock(classOf[GetPartitioningCheckpoints]) - - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)) - .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1)))) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.right(Seq.empty)) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - - private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) - // Get Partitioning By Id Mocks private val getPartitioningByIdMock = mock(classOf[GetPartitioningById]) @@ -244,23 +234,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Seq") { - for { - result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue(result == Seq(checkpointFromDB1)) - }, - test("Returns expected DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( - failsWithA[DatabaseError] - ) - }, - test("Returns expected Seq.empty") { - for { - result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO3) - } yield assertTrue(result.isEmpty) - } - ), suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected AdditionalDataDTO instance") { for { @@ -330,7 +303,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMeasuresMockLayer, getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, - getPartitioningCheckpointsMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer, getPartitioningMeasuresV2MockLayer diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala index 69b844bfc..3a88de682 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala @@ -21,6 +21,7 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.CheckpointRepository +import za.co.absa.atum.server.model.PaginatedResult.ResultHasMore import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -49,6 +50,11 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { when(checkpointRepositoryMock.getCheckpointV2(partitioningId, checkpointV2DTO2.id)) .thenReturn(ZIO.fail(NotFoundDatabaseError("not found"))) + when(checkpointRepositoryMock.getPartitioningCheckpoints(1L, None, None, None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(checkpointRepositoryMock.getPartitioningCheckpoints(0L, None, None, None)) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + private val checkpointRepositoryMockLayer = ZLayer.succeed(checkpointRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -104,6 +110,20 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundServiceError] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Right with Seq[CheckpointDTO]") { + for { + result <- CheckpointService.getPartitioningCheckpoints(1L, None, None, None) + } yield assertTrue { + result == ResultHasMore(Seq(checkpointV2DTO1)) + } + }, + test("Returns expected NotFoundServiceError") { + assertZIO(CheckpointService.getPartitioningCheckpoints(0L, None, None, None).exit)( + failsWithA[NotFoundServiceError] + ) + } ) ).provide( CheckpointServiceImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index ffbe160f7..6776dde90 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -61,11 +61,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningAdditionalData(partitioningDTO2)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2))) - when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(2L)) @@ -176,20 +171,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Right with Seq[CheckpointDTO]") { - for { - result <- PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue { - result == Seq(checkpointDTO1, checkpointDTO2.copy(partitioning = checkpointDTO1.partitioning)) - } - }, - test("Returns expected ServiceError") { - assertZIO(PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( - failsWithA[ServiceError] - ) - } - ), suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected Right with AdditionalDataDTO") { for {