From 6b2df2429cb28ec86793f846007bf85131b45d11 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 5 Sep 2024 16:55:53 +0200 Subject: [PATCH 01/24] tmp commit --- .../V1.8.3__get_partitioning_checkpoints.sql | 78 +++-- ...titioningCheckpointsIntegrationTests.scala | 292 ++++++++++++------ .../api/controller/BaseController.scala | 12 + .../api/controller/CheckpointController.scala | 9 +- .../controller/CheckpointControllerImpl.scala | 17 +- .../controller/PartitioningController.scala | 6 +- .../PartitioningControllerImpl.scala | 18 +- .../GetPartitioningCheckpoints.scala | 30 +- .../absa/atum/server/api/http/Endpoints.scala | 24 +- .../co/absa/atum/server/api/http/Routes.scala | 13 +- .../api/repository/CheckpointRepository.scala | 7 + .../repository/CheckpointRepositoryImpl.scala | 56 +++- .../repository/PartitioningRepository.scala | 2 +- .../PartitioningRepositoryImpl.scala | 26 +- .../api/service/CheckpointService.scala | 7 + .../api/service/CheckpointServiceImpl.scala | 12 + .../api/service/PartitioningService.scala | 2 +- .../api/service/PartitioningServiceImpl.scala | 32 +- .../server/model/CheckpointItemFromDB.scala | 21 +- .../za/co/absa/atum/server/api/TestData.scala | 3 +- .../PartitioningControllerUnitTests.scala | 46 +-- ...titioningCheckpointsIntegrationTests.scala | 18 +- .../CheckpointRepositoryUnitTests.scala | 37 ++- .../PartitioningRepositoryUnitTests.scala | 29 +- .../service/CheckpointServiceUnitTests.scala | 20 ++ .../PartitioningServiceUnitTests.scala | 19 -- 26 files changed, 545 insertions(+), 291 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 565d582a6..a2e2bc33c 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -13,10 +13,10 @@ * limitations under the License. */ --- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT) CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( - IN i_partitioning JSONB, + IN i_partitioning_id BIGINT, IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, IN i_checkpoint_name TEXT DEFAULT NULL, OUT status INTEGER, OUT status_text TEXT, @@ -28,46 +28,50 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( OUT measured_columns TEXT[], OUT measurement_value JSONB, OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT has_more BOOLEAN ) - RETURNS SETOF record AS -$$ - ------------------------------------------------------------------------------- +RETURNS SETOF record AS +------------------------------------------------------------------------------- -- --- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT) --- Retrieves all checkpoints (measures and their measurement details) related to a +-- Function: runs.get_partitioning_checkpoints(4) +-- Retrieves checkpoints (measures and their measurement details) related to a -- given partitioning (and checkpoint name, if specified). -- -- Parameters: -- i_partitioning - partitioning of requested checkpoints -- i_limit - (optional) maximum number of checkpoint's measurements to return -- if 0 specified, all data will be returned, i.e. no limit will be applied +-- i_offset - (optional) offset of the first checkpoint to return +-- i_checkpoint_name - (optional) name of the checkpoint + -- -- Returns: --- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name -- status - Status code -- status_text - Status message -- id_checkpoint - ID of the checkpoint -- checkpoint_name - Name of the checkpoint -- author - Author of the checkpoint --- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent -- measure_name - Name of the measure -- measure_columns - Columns of the measure -- measurement_value - Value of the measurement -- checkpoint_start_time - Time of the checkpoint -- checkpoint_end_time - End time of the checkpoint computation +-- has_more - Flag indicating whether there are more checkpoints available -- -- Status codes: -- 11 - OK -- 41 - Partitioning not found +-- 42 - No checkpoint data found -- ------------------------------------------------------------------------------- +$$ DECLARE - _fk_partitioning BIGINT; + v_found BOOLEAN := FALSE; BEGIN - _fk_partitioning = runs._get_id_partitioning(i_partitioning); - - IF _fk_partitioning IS NULL THEN + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN status := 41; status_text := 'Partitioning not found'; RETURN NEXT; @@ -75,6 +79,16 @@ BEGIN END IF; RETURN QUERY + WITH limited_checkpoints AS ( + SELECT DISTINCT C.id_checkpoint, + C.process_start_time, + ROW_NUMBER() OVER (ORDER BY C.process_start_time DESC, C.id_checkpoint) AS rn + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.process_start_time DESC, C.id_checkpoint + LIMIT i_limit + 1 OFFSET i_offset + ) SELECT 11 AS status, 'Ok' AS status_text, @@ -86,28 +100,38 @@ BEGIN md.measured_columns, M.measurement_value, C.process_start_time AS checkpoint_start_time, - C.process_end_time AS checkpoint_end_time + C.process_end_time AS checkpoint_end_time, + (SELECT COUNT(*) > i_limit FROM limited_checkpoints) AS has_more FROM runs.checkpoints C - JOIN + JOIN runs.measurements M ON C.id_checkpoint = M.fk_checkpoint - JOIN + JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition WHERE - C.fk_partitioning = _fk_partitioning - AND - (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + C.id_checkpoint IN (SELECT LC.id_checkpoint FROM limited_checkpoints as LC WHERE LC.rn <= i_limit) ORDER BY C.process_start_time, - C.id_checkpoint - LIMIT nullif(i_limit, 0); + C.id_checkpoint; + IF NOT FOUND THEN + status := 42; + status_text := 'No checkpoint data found'; + id_checkpoint := NULL; + checkpoint_name := NULL; + author := NULL; + measured_by_atum_agent := NULL; + measure_name := NULL; + measured_columns := NULL; + measurement_value := NULL; + checkpoint_start_time := NULL; + checkpoint_end_time := NULL; + has_more := FALSE; + RETURN NEXT; + END IF; END; $$ - LANGUAGE plpgsql VOLATILE SECURITY DEFINER; -ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner; - -GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner; - +ALTER FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index 719391c3c..98e285e5f 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.atum.database.runs import za.co.absa.balta.DBTestSuite @@ -7,7 +23,7 @@ import za.co.absa.balta.classes.setter.CustomDBType import java.time.OffsetDateTime import java.util.UUID -class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ +class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { private val fncGetPartitioningCheckpoints = "runs.get_partitioning_checkpoints" @@ -31,37 +47,26 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ |""".stripMargin ) - private val partitioning2 = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["key1", "key3", "key2", "key4"], - | "keysToValues": { - | "key1": "valueX", - | "key2": "valueY", - | "key3": "valueZ", - | "key4": "valueA" - | } - |} - |""".stripMargin - ) - - private val i_limit = 10 + private val i_limit = 1 + private val i_offset = 0 private val i_checkpoint_name = "checkpoint_1" private val measurement1 = JsonBString("""1""".stripMargin) private val measurement2 = JsonBString("""2""".stripMargin) - private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]") + private val measured_columns1 = CustomDBType("""{"col1"}""", "TEXT[]") + private val measured_columns2 = CustomDBType("""{"col2"}""", "TEXT[]") - test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") { + private val id_measure_definition1: Long = 1 + private val id_measure_definition2: Long = 2 - val uuid = UUID.randomUUID - val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val id_measure_definition: Long = 1 + private val uuid1 = UUID.randomUUID + private val uuid2 = UUID.randomUUID + private val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + private val startTime2 = OffsetDateTime.parse("1993-08-03T10:00:00Z") + private val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + test("Returns expected results when there is two measurements for one checkpoint") { table("runs.partitionings").insert( add("partitioning", partitioning1) .add("created_by", "Daniel") @@ -71,77 +76,85 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .fieldValue("partitioning", partitioning1, "id_partitioning").get.get table("runs.checkpoints").insert( - add("id_checkpoint", uuid) + add("id_checkpoint", uuid1) .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_1") - .add("process_start_time", startTime) + .add("process_start_time", startTime1) .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) table("runs.measure_definitions").insert( - add("id_measure_definition", id_measure_definition) + add("id_measure_definition", id_measure_definition1) .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns1) + ) + + table("runs.measure_definitions").insert( + add("id_measure_definition", id_measure_definition2) + .add("fk_partitioning", fkPartitioning1) + .add("created_by", "Daniel") + .add("measure_name", "measure_2") + .add("measured_columns", measured_columns2) ) table("runs.measurements").insert( - add("fk_checkpoint", uuid) - .add("fk_measure_definition", id_measure_definition) + add("fk_checkpoint", uuid1) + .add("fk_measure_definition", id_measure_definition1) .add("measurement_value", measurement1) ) + table("runs.measurements").insert( + add("fk_checkpoint", uuid1) + .add("fk_measure_definition", id_measure_definition2) + .add("measurement_value", measurement2) + ) + function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) + .setParam("i_partitioning_id", fkPartitioning1) .setParam("i_limit", i_limit) + .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) - assert(results.getString("measure_name").contains("measure_1")) - assert(!queryResult.hasNext) - } + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) - function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) - .execute { queryResult => assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) + val result2 = queryResult.next() + assert(result2.getInt("status").contains(11)) + assert(result2.getString("status_text").contains("Ok")) + assert(result2.getUUID("id_checkpoint").contains(uuid1)) + assert(result2.getString("checkpoint_name").contains("checkpoint_1")) + assert(result2.getString("author").contains("Daniel")) + assert(result2.getBoolean("measured_by_atum_agent").contains(true)) + assert(result2.getString("measure_name").contains("measure_2")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result2.getJsonB("measurement_value").contains(measurement2)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result2.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) } } - test("Get partitioning checkpoints returns multiple checkpoints for partitioning with multiple checkpoints") { - - val uuid1 = UUID.randomUUID - val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime1 = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val uuid2 = UUID.randomUUID - val startTime2 = OffsetDateTime.parse("1995-05-15T12:00:00Z") - val endTime2 = OffsetDateTime.parse("2025-07-20T15:00:00Z") - - val id_measure_definition1: Long = 1 - val id_measure_definition2: Long = 2 - + test( + "Returns expected results when there is one measurement for each of two checkpoints and different filtration is applied" + ) { table("runs.partitionings").insert( add("partitioning", partitioning1) .add("created_by", "Daniel") @@ -155,7 +168,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_1") .add("process_start_time", startTime1) - .add("process_end_time", endTime1) + .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) @@ -165,7 +178,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_2") .add("process_start_time", startTime2) - .add("process_end_time", endTime2) + .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) @@ -175,7 +188,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns1) ) table("runs.measure_definitions").insert( @@ -183,7 +196,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_2") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns2) ) table("runs.measurements").insert( @@ -199,53 +212,128 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ ) function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_limit", 2) + .setParam("i_offset", i_offset) .execute { queryResult => assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) - // Check the first result - val results1 = queryResult.next() - assert(results1.getInt("status").contains(11)) - assert(results1.getString("status_text").contains("Ok")) - assert(results1.getString("checkpoint_name").contains("checkpoint_1")) - assert(results1.getUUID("id_checkpoint").contains(uuid1)) - assert(results1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) - assert(results1.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) - assert(results1.getJsonB("measurement_value").contains(measurement1)) - assert(results1.getString("measure_name").contains("measure_1")) - - // Check the second result assert(queryResult.hasNext) - val results2 = queryResult.next() - assert(results2.getInt("status").contains(11)) - assert(results2.getString("status_text").contains("Ok")) - assert(results2.getString("checkpoint_name").contains("checkpoint_2")) - assert(results2.getUUID("id_checkpoint").contains(uuid2)) - assert(results2.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) - assert(results2.getOffsetDateTime("checkpoint_end_time").contains(endTime2)) - assert(results2.getJsonB("measurement_value").contains(measurement2)) - assert(results2.getString("measure_name").contains("measure_2")) + val result2 = queryResult.next() + assert(result2.getInt("status").contains(11)) + assert(result2.getString("status_text").contains("Ok")) + assert(result2.getUUID("id_checkpoint").contains(uuid2)) + assert(result2.getString("checkpoint_name").contains("checkpoint_2")) + assert(result2.getString("author").contains("Daniel")) + assert(result2.getBoolean("measured_by_atum_agent").contains(true)) + assert(result2.getString("measure_name").contains("measure_2")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result2.getJsonB("measurement_value").contains(measurement2)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result2.getBoolean("has_more").contains(false)) + assert(!queryResult.hasNext) + } + + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_limit", 2) + .setParam("i_offset", i_offset) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) } - } - test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") { + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_limit", 2) + .setParam("i_offset", 1) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(queryResult.hasNext) + val result = queryResult.next() + assert(result.getInt("status").contains(42)) + assert(result.getString("status_text").contains("No checkpoint data found")) + assert(!queryResult.hasNext) + } + + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", 0L) + .setParam("i_limit", 2) + .setParam("i_offset", 1) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(queryResult.hasNext) + val result = queryResult.next() + assert(result.getInt("status").contains(41)) + assert(result.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) + } - table("runs.partitionings").insert( - add("partitioning", partitioning2) - .add("created_by", "Daniel") - ) + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_limit", 1) + .setParam("i_offset", i_offset) + .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid2)) + assert(result1.getString("checkpoint_name").contains("checkpoint_2")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_2")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result1.getJsonB("measurement_value").contains(measurement2)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(true)) + assert(!queryResult.hasNext) + } + } + test("Returns expected status when partitioning not found"){ function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning2) + .setParam("i_partitioning_id", 1) .setParam("i_limit", i_limit) + .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(41)) + assert(result1.getString("status_text").contains("Partitioning not found")) assert(!queryResult.hasNext) } - } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 5e88951ae..fd0e9720f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.http.ApiPaths +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse._ import za.co.absa.atum.server.model._ import zio._ @@ -54,6 +55,17 @@ trait BaseController { effect.map(MultiSuccessResponse(_)) } + protected def mapToPaginatedResponse[A]( + limit: Int, + offset: Long, + effect: IO[ErrorResponse, PaginatedResult[A]] + ): IO[ErrorResponse, PaginatedResponse[A]] = { + effect.map { + case ResultHasMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = true)) + case ResultNoMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = false)) + } + } + // Root-anchored URL path // https://stackoverflow.com/questions/2005079/absolute-vs-relative-urls/78439286#78439286 protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index e72ee1ae3..c3cd298af 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -39,4 +39,11 @@ trait CheckpointController { checkpointId: UUID ): IO[ErrorResponse, SingleSuccessResponse[CheckpointV2DTO]] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String], + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index 9cbd50391..b2e7add29 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService -import za.co.absa.atum.server.model.ErrorResponse +import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult, SuccessResponse} import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import zio._ @@ -63,6 +63,21 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che ) ) } + + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int] = Some(10), + offset: Option[Long] = Some(0), + checkpointName: Option[String] = None + ): IO[ErrorResponse, SuccessResponse.PaginatedResponse[CheckpointV2DTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[CheckpointV2DTO], PaginatedResult[CheckpointV2DTO]]( + checkpointService.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + ) + ) + } } object CheckpointControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 2d27efa3b..05e643c42 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -41,9 +41,9 @@ trait PartitioningController { additionalDataPatchDTO: AdditionalDataPatchDTO ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] - def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] +// def getPartitioningCheckpointsV2( +// checkpointQueryDTO: CheckpointQueryDTO +// ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 7b08e3bef..a9fd8a1c5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -55,15 +55,15 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) mapToSingleSuccessResponse(createPartitioningIfNotExistsV1(partitioningSubmitDTO)) } - override def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { - mapToMultiSuccessResponse( - serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( - partitioningService.getPartitioningCheckpoints(checkpointQueryDTO) - ) - ) - } +// override def getPartitioningCheckpointsV2( +// checkpointQueryDTO: CheckpointQueryDTO +// ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { +// mapToMultiSuccessResponse( +// serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( +// partitioningService.getPartitioningCheckpoints(checkpointQueryDTO) +// ) +// ) +// } override def getPartitioningAdditionalDataV2( partitioningId: Long diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala index 4901b867a..a76ed0ad1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala @@ -17,31 +17,31 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.implicits.toSqlInterpolator -import za.co.absa.atum.model.dto.CheckpointQueryDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs -import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} +import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB} import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import zio._ -import io.circe.syntax.EncoderOps import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ -import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} -import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstRowStatusAggregator import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values => + extends DoobieMultipleResultFunctionWithAggStatus[GetPartitioningCheckpointsArgs, Option[CheckpointItemFromDB], Task](args => Seq( - fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", - fr"${values.limit}", - fr"${values.checkpointName}" + fr"${args.partitioningId}", + fr"${args.limit}", + fr"${args.offset}", + fr"${args.checkpointName}" ) ) with StandardStatusHandling - with ByFirstErrorStatusAggregator { + with ByFirstRowStatusAggregator { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", @@ -52,11 +52,19 @@ class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngi "measured_columns", "measurement_value", "checkpoint_start_time", - "checkpoint_end_time" + "checkpoint_end_time", + "has_more" ) } object GetPartitioningCheckpoints { + case class GetPartitioningCheckpointsArgs( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ) + val layer: URLayer[PostgresDatabaseProvider, GetPartitioningCheckpoints] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index daa0a684f..fd3291318 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -23,7 +23,7 @@ import sttp.tapir.json.circe.jsonBody import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import sttp.tapir.{PublicEndpoint, endpoint} import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} @@ -101,13 +101,27 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } +// protected val getPartitioningCheckpointsEndpointV2 +// : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { +// apiV2.get +// .in(GetPartitioningCheckpoints) +// .in(jsonBody[CheckpointQueryDTO]) +// .out(statusCode(StatusCode.Ok)) +// .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) +// } + protected val getPartitioningCheckpointsEndpointV2 - : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { + : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[ + CheckpointV2DTO + ], Any] = { apiV2.get - .in(GetPartitioningCheckpoints) - .in(jsonBody[CheckpointQueryDTO]) + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Checkpoints) + .in(query[Option[Int]]("limit")) + .in(query[Option[Long]]("offset")) + .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) - .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) + .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) } protected val getFlowCheckpointsEndpointV2 diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 1177e36d1..81376ab75 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -31,7 +31,7 @@ import za.co.absa.atum.server.api.controller.{CheckpointController, FlowControll import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -82,7 +82,16 @@ trait Routes extends Endpoints with ServerOptions { CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointId) } ), - createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), + createServerEndpoint[ + (Long, Option[Int], Option[Long], Option[String]), + ErrorResponse, + PaginatedResponse[CheckpointV2DTO] + ]( + getPartitioningCheckpointsEndpointV2, + { case (partitioningId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + } + ), createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index 771b68efa..5f48c96b3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.PaginatedResult import zio._ import zio.macros.accessible @@ -28,4 +29,10 @@ trait CheckpointRepository { def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] def writeCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[DatabaseError, Unit] def getCheckpointV2(partitioningId: Long, checkpointId: UUID): IO[DatabaseError, CheckpointV2DTO] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index f4ee58629..9f7608232 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -19,11 +19,13 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs -import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV2} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError -import za.co.absa.atum.server.model.CheckpointItemFromDB +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult} import zio._ import zio.interop.catz.asyncInstance @@ -32,7 +34,8 @@ import java.util.UUID class CheckpointRepositoryImpl( writeCheckpointFn: WriteCheckpoint, writeCheckpointV2Fn: WriteCheckpointV2, - getCheckpointV2Fn: GetPartitioningCheckpointV2 + getCheckpointV2Fn: GetPartitioningCheckpointV2, + getPartitioningCheckpoints: GetPartitioningCheckpoints ) extends CheckpointRepository with BaseRepository { @@ -60,14 +63,47 @@ class CheckpointRepositoryImpl( } } + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getPartitioningCheckpoints(GetPartitioningCheckpointsArgs(partitioningId, limit, offset, checkpointName)), + "getPartitioningCheckpoints" + ) + .map(_.flatten) + .flatMap { checkpointItems => + ZIO + .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + checkpoints => + if (checkpointItems.nonEmpty && checkpointItems.head.hasMore) ResultHasMore(checkpoints) + else ResultNoMore(checkpoints) + ) + } + } + } object CheckpointRepositoryImpl { - val layer: URLayer[WriteCheckpoint with WriteCheckpointV2 with GetPartitioningCheckpointV2, CheckpointRepository] = ZLayer { - for { - writeCheckpoint <- ZIO.service[WriteCheckpoint] - writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] - getCheckpointV2 <- ZIO.service[GetPartitioningCheckpointV2] - } yield new CheckpointRepositoryImpl(writeCheckpoint, writeCheckpointV2, getCheckpointV2) - } + val layer: URLayer[ + WriteCheckpoint with WriteCheckpointV2 with GetPartitioningCheckpointV2 with GetPartitioningCheckpoints, + CheckpointRepository + ] = + ZLayer { + for { + writeCheckpoint <- ZIO.service[WriteCheckpoint] + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + getCheckpointV2 <- ZIO.service[GetPartitioningCheckpointV2] + getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] + } yield new CheckpointRepositoryImpl( + writeCheckpoint, + writeCheckpointV2, + getCheckpointV2, + getPartitioningCheckpoints + ) + } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 1e8cc84d7..3b3423b25 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -39,7 +39,7 @@ trait PartitioningRepository { additionalData: AdditionalDataPatchDTO ): IO[DatabaseError, AdditionalDataDTO] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] +// def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 63788ea1a..8c9738337 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -30,7 +30,7 @@ class PartitioningRepositoryImpl( getPartitioningMeasuresFn: GetPartitioningMeasures, getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, - getPartitioningCheckpointsFn: GetPartitioningCheckpoints, +// getPartitioningCheckpointsFn: GetPartitioningCheckpoints, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2 ) extends PartitioningRepository @@ -69,14 +69,14 @@ class PartitioningRepositoryImpl( ).map(_.map { case AdditionalDataFromDB(adName, adValue) => adName.get -> adValue }.toMap) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbMultipleResultCallWithAggregatedStatus( - getPartitioningCheckpointsFn(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - } +// override def getPartitioningCheckpoints( +// checkpointQueryDTO: CheckpointQueryDTO +// ): IO[DatabaseError, Seq[CheckpointFromDB]] = { +// dbMultipleResultCallWithAggregatedStatus( +// getPartitioningCheckpointsFn(checkpointQueryDTO), +// "getPartitioningCheckpoints" +// ) +// } override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[DatabaseError, AdditionalDataDTO] = { dbMultipleResultCallWithAggregatedStatus( @@ -106,9 +106,9 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasures with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData - with GetPartitioningCheckpoints +// with GetPartitioningCheckpoints with GetPartitioningAdditionalDataV2 - with GetPartitioningCheckpoints +// with GetPartitioningCheckpoints with GetPartitioningById, PartitioningRepository ] = ZLayer { @@ -117,7 +117,7 @@ object PartitioningRepositoryImpl { getPartitioningMeasures <- ZIO.service[GetPartitioningMeasures] getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData] - getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] +// getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] } yield new PartitioningRepositoryImpl( @@ -125,7 +125,7 @@ object PartitioningRepositoryImpl { getPartitioningMeasures, getPartitioningAdditionalData, createOrUpdateAdditionalData, - getPartitioningCheckpoints, +// getPartitioningCheckpoints, getPartitioningById, getPartitioningAdditionalDataV2 ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index 62cf7a231..d45c09e59 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -28,4 +29,10 @@ trait CheckpointService { def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] def getCheckpointV2(partitioningId: Long, checkpointId: UUID): IO[ServiceError, CheckpointV2DTO] def saveCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[ServiceError, Unit] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index 1da3a19f8..c0844f063 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.CheckpointRepository +import za.co.absa.atum.server.model.PaginatedResult import zio._ import java.util.UUID @@ -46,6 +47,17 @@ class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends ) } + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] = { + repositoryCall( + checkpointRepository.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName), + "getPartitioningCheckpoints" + ) + } } object CheckpointServiceImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index ab699997f..84d866624 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -36,7 +36,7 @@ trait PartitioningService { additionalData: AdditionalDataPatchDTO ): IO[ServiceError, AdditionalDataDTO] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] +// def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 1925568b7..cbea30fe1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -50,22 +50,22 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ServiceError, Seq[CheckpointDTO]] = { - for { - checkpointsFromDB <- repositoryCall( - partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => - ZIO - .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => GeneralServiceError(error.getMessage)) - } - } yield checkpointDTOs - - } +// override def getPartitioningCheckpoints( +// checkpointQueryDTO: CheckpointQueryDTO +// ): IO[ServiceError, Seq[CheckpointDTO]] = { +// for { +// checkpointsFromDB <- repositoryCall( +// partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), +// "getPartitioningCheckpoints" +// ) +// checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => +// ZIO +// .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) +// .mapError(error => GeneralServiceError(error.getMessage)) +// } +// } yield checkpointDTOs +// +// } override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] = { repositoryCall( diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala index 035e55fc9..71d0fe57f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala @@ -31,7 +31,8 @@ case class CheckpointItemFromDB( measuredColumns: Seq[String], measurementValue: Json, // JSON representation of `MeasurementDTO` checkpointStartTime: ZonedDateTime, - checkpointEndTime: Option[ZonedDateTime] + checkpointEndTime: Option[ZonedDateTime], + hasMore: Boolean ) object CheckpointItemFromDB { @@ -71,4 +72,22 @@ object CheckpointItemFromDB { } } + def groupAndConvertItemsToCheckpointV2DTOs( + checkpointItems: Seq[CheckpointItemFromDB] + ): Either[DecodingFailure, Seq[CheckpointV2DTO]] = { + val groupedItems = checkpointItems.groupBy(_.idCheckpoint) + val orderedIds = checkpointItems.map(_.idCheckpoint).distinct + + val result = orderedIds.map { id => + CheckpointItemFromDB.fromItemsToCheckpointV2DTO(groupedItems(id)) + } + + val errors = result.collect { case Left(err) => err } + if (errors.nonEmpty) { + Left(errors.head) + } else { + Right(result.collect { case Right(dto) => dto }) + } + } + } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index d6c3995ad..c6684d214 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -341,7 +341,8 @@ trait TestData { measuredColumns = checkpointV2DTO1.measurements.head.measure.measuredColumns, measurementValue = checkpointV2DTO1.measurements.head.result.asJson, checkpointStartTime = checkpointV2DTO1.processStartTime, - checkpointEndTime = checkpointV2DTO1.processEndTime + checkpointEndTime = checkpointV2DTO1.processEndTime, + hasMore = true ) protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 3baa18639..6b38d7ded 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -48,12 +48,12 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.patchAdditionalData(2L, additionalDataPatchDTO1)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.succeed(Seq(checkpointDTO1, checkpointDTO2))) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.succeed(Seq.empty)) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO3)) - .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) +// when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO1)) +// .thenReturn(ZIO.succeed(Seq(checkpointDTO1, checkpointDTO2))) +// when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO2)) +// .thenReturn(ZIO.succeed(Seq.empty)) +// when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO3)) +// .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) when(partitioningServiceMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) @@ -104,23 +104,23 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Seq[MeasureDTO]") { - for { - result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO1) - } yield assertTrue(result.data == Seq(checkpointDTO1, checkpointDTO2)) - }, - test("Returns expected empty sequence") { - for { - result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO2) - } yield assertTrue(result.data == Seq.empty[CheckpointDTO]) - }, - test("Returns expected InternalServerErrorResponse") { - assertZIO(PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO3).exit)( - failsWithA[InternalServerErrorResponse] - ) - } - ), +// suite("GetPartitioningCheckpointsSuite")( +// test("Returns expected Seq[MeasureDTO]") { +// for { +// result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO1) +// } yield assertTrue(result.data == Seq(checkpointDTO1, checkpointDTO2)) +// }, +// test("Returns expected empty sequence") { +// for { +// result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO2) +// } yield assertTrue(result.data == Seq.empty[CheckpointDTO]) +// }, +// test("Returns expected InternalServerErrorResponse") { +// assertZIO(PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO3).exit)( +// failsWithA[InternalServerErrorResponse] +// ) +// } +// ), suite("GetPartitioningSuite")( test("Returns expected PartitioningWithIdDTO") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala index ab38e39ca..0c8e12cf9 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala @@ -17,9 +17,9 @@ package za.co.absa.atum.server.api.database.runs.functions import za.co.absa.atum.server.ConfigProviderTest -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.db.fadb.exceptions.DataNotFoundException import za.co.absa.db.fadb.status.FunctionStatus import zio.interop.catz.asyncInstance @@ -29,23 +29,11 @@ import zio.test._ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { - - val partitioningDTO1: PartitioningDTO = Seq( - PartitionDTO("stringA", "stringA"), - PartitionDTO("stringB", "stringB") - ) - suite("GetPartitioningCheckpointsIntegrationTests")( - test("Returns expected sequence of Checkpoints with existing partitioning") { - val partitioningQueryDTO: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO1, - limit = Some(10), - checkpointName = Some("checkpointName") - ) - + test("Returns expected sequence of Checkpoints with non-existing partitioning id") { for { getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] - result <- getPartitioningCheckpoints(partitioningQueryDTO) + result <- getPartitioningCheckpoints(GetPartitioningCheckpointsArgs(0L, None, None, None)) } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index 797c4d82b..9f5568979 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -17,19 +17,21 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException} import za.co.absa.db.fadb.status.FunctionStatus import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ - import za.co.absa.db.fadb.status.Row object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -37,6 +39,7 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) private val getCheckpointMockV2: GetPartitioningCheckpointV2 = mock(classOf[GetPartitioningCheckpointV2]) private val writeCheckpointV2Mock: WriteCheckpointV2 = mock(classOf[WriteCheckpointV2]) + private val getPartitioningCheckpointsMock: GetPartitioningCheckpoints = mock(classOf[GetPartitioningCheckpoints]) when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(writeCheckpointMock.apply(checkpointDTO2)) @@ -61,9 +64,17 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { when(getCheckpointMockV2.apply(GetPartitioningCheckpointV2Args(partitioningId, checkpointV2DTO3.id))) .thenReturn(ZIO.fail(new Exception("boom!"))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(0L, None, None, None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "ok"), Some(checkpointItemFromDB1))))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(1L, None, None, None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "ok"), None)))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(3L, None, None, None))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + private val writeCheckpointMockLayer = ZLayer.succeed(writeCheckpointMock) private val getCheckpointV2MockLayer = ZLayer.succeed(getCheckpointMockV2) private val writeCheckpointV2MockLayer = ZLayer.succeed(writeCheckpointV2Mock) + private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -121,12 +132,34 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralDatabaseError] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Seq") { + for { + result <- CheckpointRepository.getPartitioningCheckpoints(0L, None, None, None) + } yield assertTrue( + result.isInstanceOf[ResultHasMore[CheckpointV2DTO]] && result.data == Seq(checkpointV2DTO1) + ) + }, + test("Returns expected Seq.empty") { + for { + result <- CheckpointRepository.getPartitioningCheckpoints(1L, None, None, None) + } yield assertTrue( + result.isInstanceOf[ResultNoMore[CheckpointV2DTO]] && result.data == Seq.empty[CheckpointV2DTO] + ) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(CheckpointRepository.getPartitioningCheckpoints(3L, None, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) + } ) ).provide( CheckpointRepositoryImpl.layer, writeCheckpointMockLayer, writeCheckpointV2MockLayer, - getCheckpointV2MockLayer + getCheckpointV2MockLayer, + getPartitioningCheckpointsMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 520a77792..f5349e2e5 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -79,16 +79,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) - // Get Partitioning Checkpoints Mocks - private val getPartitioningCheckpointsMock = mock(classOf[GetPartitioningCheckpoints]) - - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)) - .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1)))) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.right(Seq.empty)) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - - private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) - // Get Partitioning By Id Mocks private val getPartitioningByIdMock = mock(classOf[GetPartitioningById]) @@ -187,23 +177,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Seq") { - for { - result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue(result == Seq(checkpointFromDB1)) - }, - test("Returns expected DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( - failsWithA[DatabaseError] - ) - }, - test("Returns expected Seq.empty") { - for { - result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO3) - } yield assertTrue(result.isEmpty) - } - ), suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected AdditionalDataDTO instance") { for { @@ -248,7 +221,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMeasuresMockLayer, getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, - getPartitioningCheckpointsMockLayer, +// getPartitioningCheckpointsMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer ) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala index 69b844bfc..3a88de682 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala @@ -21,6 +21,7 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.CheckpointRepository +import za.co.absa.atum.server.model.PaginatedResult.ResultHasMore import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -49,6 +50,11 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { when(checkpointRepositoryMock.getCheckpointV2(partitioningId, checkpointV2DTO2.id)) .thenReturn(ZIO.fail(NotFoundDatabaseError("not found"))) + when(checkpointRepositoryMock.getPartitioningCheckpoints(1L, None, None, None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(checkpointRepositoryMock.getPartitioningCheckpoints(0L, None, None, None)) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + private val checkpointRepositoryMockLayer = ZLayer.succeed(checkpointRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -104,6 +110,20 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundServiceError] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Right with Seq[CheckpointDTO]") { + for { + result <- CheckpointService.getPartitioningCheckpoints(1L, None, None, None) + } yield assertTrue { + result == ResultHasMore(Seq(checkpointV2DTO1)) + } + }, + test("Returns expected NotFoundServiceError") { + assertZIO(CheckpointService.getPartitioningCheckpoints(0L, None, None, None).exit)( + failsWithA[NotFoundServiceError] + ) + } ) ).provide( CheckpointServiceImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index c297ad18a..6ab307237 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -53,11 +53,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningAdditionalData(partitioningDTO2)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2))) - when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(2L)) @@ -140,20 +135,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Right with Seq[CheckpointDTO]") { - for { - result <- PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue { - result == Seq(checkpointDTO1, checkpointDTO2.copy(partitioning = checkpointDTO1.partitioning)) - } - }, - test("Returns expected ServiceError") { - assertZIO(PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( - failsWithA[ServiceError] - ) - } - ), suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected Right with AdditionalDataDTO") { for { From 4664180c88ed935b5cdb7425ece5999dd1b2a1a7 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 5 Sep 2024 16:56:16 +0200 Subject: [PATCH 02/24] tmp commit --- .../main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index a2e2bc33c..fd2629fb6 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -67,8 +67,6 @@ RETURNS SETOF record AS -- ------------------------------------------------------------------------------- $$ -DECLARE - v_found BOOLEAN := FALSE; BEGIN PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; IF NOT FOUND THEN From 099ce5a735bccda4c30432d90b22f6f14d9cecec Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 6 Sep 2024 10:06:09 +0200 Subject: [PATCH 03/24] sql change --- .../runs/V1.8.3__get_partitioning_checkpoints.sql | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index fd2629fb6..39a6def6e 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -40,8 +40,7 @@ RETURNS SETOF record AS -- -- Parameters: -- i_partitioning - partitioning of requested checkpoints --- i_limit - (optional) maximum number of checkpoint's measurements to return --- if 0 specified, all data will be returned, i.e. no limit will be applied +-- i_limit - (optional) maximum number of checkpoints to return -- i_offset - (optional) offset of the first checkpoint to return -- i_checkpoint_name - (optional) name of the checkpoint @@ -115,16 +114,6 @@ BEGIN IF NOT FOUND THEN status := 42; status_text := 'No checkpoint data found'; - id_checkpoint := NULL; - checkpoint_name := NULL; - author := NULL; - measured_by_atum_agent := NULL; - measure_name := NULL; - measured_columns := NULL; - measurement_value := NULL; - checkpoint_start_time := NULL; - checkpoint_end_time := NULL; - has_more := FALSE; RETURN NEXT; END IF; END; From 2bd1548b82df4e5fe1f45c81eb7ce50543b9e90e Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 6 Sep 2024 12:18:56 +0200 Subject: [PATCH 04/24] tests --- .../api/controller/CheckpointController.scala | 2 +- .../controller/CheckpointControllerImpl.scala | 10 +- .../absa/atum/server/api/http/Endpoints.scala | 13 +- .../atum/server/model/PaginatedResult.scala | 12 ++ .../CheckpointControllerUnitTests.scala | 69 ++++++++-- ...itioningCheckpointsEndpointUnitTests.scala | 119 ++++++++++++++++++ 6 files changed, 195 insertions(+), 30 deletions(-) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index c3cd298af..84280e465 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -43,7 +43,7 @@ trait CheckpointController { partitioningId: Long, limit: Option[Int], offset: Option[Long], - checkpointName: Option[String], + checkpointName: Option[String] = None, ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index b2e7add29..c2221eec8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -19,8 +19,8 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService -import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult, SuccessResponse} -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import java.util.UUID @@ -66,10 +66,10 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che override def getPartitioningCheckpoints( partitioningId: Long, - limit: Option[Int] = Some(10), - offset: Option[Long] = Some(0), + limit: Option[Int], + offset: Option[Long], checkpointName: Option[String] = None - ): IO[ErrorResponse, SuccessResponse.PaginatedResponse[CheckpointV2DTO]] = { + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] = { mapToPaginatedResponse( limit.get, offset.get, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index fd3291318..ef4419452 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -101,23 +101,14 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } -// protected val getPartitioningCheckpointsEndpointV2 -// : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { -// apiV2.get -// .in(GetPartitioningCheckpoints) -// .in(jsonBody[CheckpointQueryDTO]) -// .out(statusCode(StatusCode.Ok)) -// .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) -// } - protected val getPartitioningCheckpointsEndpointV2 : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[ CheckpointV2DTO ], Any] = { apiV2.get .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Checkpoints) - .in(query[Option[Int]]("limit")) - .in(query[Option[Long]]("offset")) + .in(query[Option[Int]]("limit").default(Some(10))) + .in(query[Option[Long]]("offset").default(Some(0L))) .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala index e3a2898e4..4f7887852 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala @@ -16,6 +16,9 @@ package za.co.absa.atum.server.model +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + sealed trait PaginatedResult[R] { def data: Seq[R] } @@ -23,6 +26,15 @@ sealed trait PaginatedResult[R] { object PaginatedResult { case class ResultHasMore[R](data: Seq[R]) extends PaginatedResult[R] + object ResultHasMore { + implicit def encoder[T: Encoder]: Encoder[ResultHasMore[T]] = deriveEncoder + implicit def decoder[T: Decoder]: Decoder[ResultHasMore[T]] = deriveDecoder + } + case class ResultNoMore[R](data: Seq[R]) extends PaginatedResult[R] + object ResultNoMore { + implicit def encoder[T: Encoder]: Encoder[ResultNoMore[T]] = deriveEncoder + implicit def decoder[T: Decoder]: Decoder[ResultNoMore[T]] = deriveDecoder + } } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala index 52d13e6c3..b4e8d33fa 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala @@ -22,8 +22,14 @@ import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.CheckpointService +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse -import za.co.absa.atum.server.model.{ConflictErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse} +import za.co.absa.atum.server.model.{ + ConflictErrorResponse, + InternalServerErrorResponse, + NotFoundErrorResponse, + Pagination +} import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -38,23 +44,31 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { when(checkpointServiceMock.saveCheckpoint(checkpointDTO3)) .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) - private val partitioningId = 1L + private val partitioningId1 = 1L + private val partitioningId2 = 2L - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO1)).thenReturn(ZIO.unit) - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO2)) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO1)).thenReturn(ZIO.unit) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO2)) .thenReturn(ZIO.fail(GeneralServiceError("error in data"))) - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO3)) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO3)) .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) when(checkpointServiceMock.saveCheckpointV2(0L, checkpointV2DTO3)) .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO1.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO1.id)) .thenReturn(ZIO.succeed(checkpointV2DTO1)) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO2.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO2.id)) .thenReturn(ZIO.fail(NotFoundServiceError("not found"))) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO3.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO3.id)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(checkpointServiceMock.getPartitioningCheckpoints(partitioningId1, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(checkpointServiceMock.getPartitioningCheckpoints(partitioningId2, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointV2DTO1)))) + when(checkpointServiceMock.getPartitioningCheckpoints(0L, Some(10), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) + private val checkpointServiceMockLayer = ZLayer.succeed(checkpointServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -80,11 +94,11 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { suite("PostCheckpointV2Suite")( test("Returns expected CheckpointDTO") { for { - result <- CheckpointController.postCheckpointV2(partitioningId, checkpointV2DTO1) + result <- CheckpointController.postCheckpointV2(partitioningId1, checkpointV2DTO1) } yield assertTrue( result._1.isInstanceOf[SingleSuccessResponse[CheckpointV2DTO]] && result._1.data == checkpointV2DTO1 - && result._2 == s"/api/v2/partitionings/$partitioningId/checkpoints/${checkpointV2DTO1.id}" + && result._2 == s"/api/v2/partitionings/$partitioningId1/checkpoints/${checkpointV2DTO1.id}" ) }, test("Returns expected InternalServerErrorResponse") { @@ -106,19 +120,48 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { suite("GetPartitioningCheckpointV2Suite")( test("Returns expected CheckpointDTO") { for { - result <- CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO1.id) + result <- CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO1.id) } yield assertTrue(result.data == checkpointV2DTO1) }, test("Returns expected NotFoundErrorResponse") { - assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO2.id).exit)( + assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO2.id).exit)( failsWithA[NotFoundErrorResponse] ) }, test("Returns expected InternalServerErrorResponse") { - assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO3.id).exit)( + assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO3.id).exit)( failsWithA[InternalServerErrorResponse] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is more data available") { + for { + result <- CheckpointController.getPartitioningCheckpoints( + partitioningId1, + limit = Some(10), + offset = Some(0) + ) + } yield assertTrue( + result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(10, 0, hasMore = true) + ) + }, + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is no more data available") { + for { + result <- CheckpointController.getPartitioningCheckpoints( + partitioningId2, + limit = Some(10), + offset = Some(0) + ) + } yield assertTrue( + result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(10, 0, hasMore = false) + ) + }, + test("Returns expected NotFoundErrorResponse when service returns NotFoundServiceError") { + assertZIO(CheckpointController.getPartitioningCheckpoints(0L, Some(10), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } ) ).provide( CheckpointControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala new file mode 100644 index 000000000..ebd4638c6 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.{UriContext, basicRequest} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.CheckpointController +import za.co.absa.atum.server.model.{NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.{Scope, ZIO, ZLayer} +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} + +import java.util.UUID + +object GetPartitioningCheckpointsEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val checkpointControllerMock = mock(classOf[CheckpointController]) + + private val uuid = UUID.randomUUID() + + when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(10, 0, hasMore = true), uuid))) + when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(20), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid))) + when(checkpointControllerMock.getPartitioningCheckpoints(2L, Some(10), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundErrorResponse("not found checkpoint data for a given ID"))) + + private val checkpointControllerMockLayer = ZLayer.succeed(checkpointControllerMock) + + private val getPartitioningCheckpointServerEndpointV2 = getPartitioningCheckpointsEndpointV2 + .zServerLogic({ + case (partitioningId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[CheckpointController])) + .whenServerEndpoint(getPartitioningCheckpointServerEndpointV2) + .thenRunLogic() + .backend() + + suite("GetPartitioningCheckpointsEndpointSuite")( + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with more data available") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=10&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(10, 0, hasMore = true), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with no more data available") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=20&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns expected 404 when checkpoint data for a given ID doesn't exist") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/2/checkpoints?limit=10&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + } + ) + }.provide( + checkpointControllerMockLayer + ) + +} From e4cf419f0336412dc7a15e3a6b5e847936194dbc Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 6 Sep 2024 12:30:38 +0200 Subject: [PATCH 05/24] tests --- .../controller/PartitioningController.scala | 6 +----- .../PartitioningControllerImpl.scala | 12 +----------- .../repository/PartitioningRepository.scala | 3 --- .../PartitioningRepositoryImpl.scala | 16 +--------------- .../api/service/PartitioningService.scala | 2 -- .../api/service/PartitioningServiceImpl.scala | 19 ------------------- 6 files changed, 3 insertions(+), 55 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 05e643c42..589fa5db6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import zio.IO import zio.macros.accessible @@ -41,10 +41,6 @@ trait PartitioningController { additionalDataPatchDTO: AdditionalDataPatchDTO ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] -// def getPartitioningCheckpointsV2( -// checkpointQueryDTO: CheckpointQueryDTO -// ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] - def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index a9fd8a1c5..6f512cbc7 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -19,8 +19,8 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.service.PartitioningService +import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import zio._ class PartitioningControllerImpl(partitioningService: PartitioningService) @@ -55,16 +55,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) mapToSingleSuccessResponse(createPartitioningIfNotExistsV1(partitioningSubmitDTO)) } -// override def getPartitioningCheckpointsV2( -// checkpointQueryDTO: CheckpointQueryDTO -// ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { -// mapToMultiSuccessResponse( -// serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( -// partitioningService.getPartitioningCheckpoints(checkpointQueryDTO) -// ) -// ) -// } - override def getPartitioningAdditionalDataV2( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 3b3423b25..1ddd37e7d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,7 +18,6 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB import zio.IO import zio.macros.accessible @@ -39,8 +38,6 @@ trait PartitioningRepository { additionalData: AdditionalDataPatchDTO ): IO[DatabaseError, AdditionalDataDTO] -// def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] - def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 8c9738337..a8b7e9ddc 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -20,7 +20,7 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB, PartitioningFromDB} +import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, MeasureFromDB, PartitioningFromDB} import zio._ import zio.interop.catz.asyncInstance import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError @@ -30,7 +30,6 @@ class PartitioningRepositoryImpl( getPartitioningMeasuresFn: GetPartitioningMeasures, getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, -// getPartitioningCheckpointsFn: GetPartitioningCheckpoints, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2 ) extends PartitioningRepository @@ -69,15 +68,6 @@ class PartitioningRepositoryImpl( ).map(_.map { case AdditionalDataFromDB(adName, adValue) => adName.get -> adValue }.toMap) } -// override def getPartitioningCheckpoints( -// checkpointQueryDTO: CheckpointQueryDTO -// ): IO[DatabaseError, Seq[CheckpointFromDB]] = { -// dbMultipleResultCallWithAggregatedStatus( -// getPartitioningCheckpointsFn(checkpointQueryDTO), -// "getPartitioningCheckpoints" -// ) -// } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[DatabaseError, AdditionalDataDTO] = { dbMultipleResultCallWithAggregatedStatus( getPartitioningAdditionalDataV2Fn(partitioningId), @@ -106,9 +96,7 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasures with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData -// with GetPartitioningCheckpoints with GetPartitioningAdditionalDataV2 -// with GetPartitioningCheckpoints with GetPartitioningById, PartitioningRepository ] = ZLayer { @@ -117,7 +105,6 @@ object PartitioningRepositoryImpl { getPartitioningMeasures <- ZIO.service[GetPartitioningMeasures] getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData] -// getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] } yield new PartitioningRepositoryImpl( @@ -125,7 +112,6 @@ object PartitioningRepositoryImpl { getPartitioningMeasures, getPartitioningAdditionalData, createOrUpdateAdditionalData, -// getPartitioningCheckpoints, getPartitioningById, getPartitioningAdditionalDataV2 ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 84d866624..47e62c524 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -36,8 +36,6 @@ trait PartitioningService { additionalData: AdditionalDataPatchDTO ): IO[ServiceError, AdditionalDataDTO] -// def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] - def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index cbea30fe1..e83113471 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -18,9 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.atum.server.model.CheckpointFromDB import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -50,23 +48,6 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } -// override def getPartitioningCheckpoints( -// checkpointQueryDTO: CheckpointQueryDTO -// ): IO[ServiceError, Seq[CheckpointDTO]] = { -// for { -// checkpointsFromDB <- repositoryCall( -// partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), -// "getPartitioningCheckpoints" -// ) -// checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => -// ZIO -// .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) -// .mapError(error => GeneralServiceError(error.getMessage)) -// } -// } yield checkpointDTOs -// -// } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] = { repositoryCall( partitioningRepository.getPartitioningAdditionalDataV2(partitioningId), From d59d7da85f1a8ca6845eaff3301b47ae31ff1824 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 6 Sep 2024 12:33:53 +0200 Subject: [PATCH 06/24] conflicts --- .../server/api/repository/PartitioningRepositoryImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index ed78ee768..a4b21e1f1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -106,8 +106,8 @@ object PartitioningRepositoryImpl { with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData with GetPartitioningAdditionalDataV2 - with GetPartitioningById, - with GetPartitioningMeasuresById + with GetPartitioningById + with GetPartitioningMeasuresById, PartitioningRepository ] = ZLayer { for { From eb01623864d44ad90b7f1c7e44e8850771b50b28 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 6 Sep 2024 12:38:59 +0200 Subject: [PATCH 07/24] fex --- .../atum/server/api/controller/PartitioningController.scala | 3 +-- .../server/api/controller/PartitioningControllerImpl.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 85f4d0e11..9f29ab39d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -43,7 +43,6 @@ trait PartitioningController { def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] - def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index c0bc3d5fa..b9f713b0a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} import zio._ From 1b28b7ca7e42df4680fbd9be044d323c751920b1 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 6 Sep 2024 13:05:48 +0200 Subject: [PATCH 08/24] fex --- .../PartitioningControllerUnitTests.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 6b38d7ded..3e9348cdd 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -17,7 +17,6 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError.{GeneralServiceError, NotFoundServiceError} import za.co.absa.atum.server.api.service.PartitioningService @@ -48,13 +47,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.patchAdditionalData(2L, additionalDataPatchDTO1)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) -// when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO1)) -// .thenReturn(ZIO.succeed(Seq(checkpointDTO1, checkpointDTO2))) -// when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO2)) -// .thenReturn(ZIO.succeed(Seq.empty)) -// when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO3)) -// .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(partitioningServiceMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningServiceMock.getPartitioningAdditionalDataV2(2L)) @@ -104,23 +96,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), -// suite("GetPartitioningCheckpointsSuite")( -// test("Returns expected Seq[MeasureDTO]") { -// for { -// result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO1) -// } yield assertTrue(result.data == Seq(checkpointDTO1, checkpointDTO2)) -// }, -// test("Returns expected empty sequence") { -// for { -// result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO2) -// } yield assertTrue(result.data == Seq.empty[CheckpointDTO]) -// }, -// test("Returns expected InternalServerErrorResponse") { -// assertZIO(PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO3).exit)( -// failsWithA[InternalServerErrorResponse] -// ) -// } -// ), suite("GetPartitioningSuite")( test("Returns expected PartitioningWithIdDTO") { for { From 67cab0bbbc3b9b1ec69e7544d0ad1be487919a73 Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Thu, 12 Sep 2024 09:39:15 +0200 Subject: [PATCH 09/24] Update database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql Co-authored-by: Ladislav Sulak --- .../main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 39a6def6e..5a17fb38b 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -51,7 +51,7 @@ RETURNS SETOF record AS -- id_checkpoint - ID of the checkpoint -- checkpoint_name - Name of the checkpoint -- author - Author of the checkpoint --- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent -- measure_name - Name of the measure -- measure_columns - Columns of the measure -- measurement_value - Value of the measurement From dfc8ad96523915779eb177e343a4bf72b6d34023 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 12 Sep 2024 09:56:05 +0200 Subject: [PATCH 10/24] comments addressed --- .../runs/V1.8.3__get_partitioning_checkpoints.sql | 11 ++++++++--- .../absa/atum/server/model/CheckpointItemFromDB.scala | 4 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 5a17fb38b..8f11939a3 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -41,9 +41,14 @@ RETURNS SETOF record AS -- Parameters: -- i_partitioning - partitioning of requested checkpoints -- i_limit - (optional) maximum number of checkpoints to return +-- i_limit relates to amount of checkpoints returned, not the amount of rows returned +-- as usually there is more than one row per checkpoint -- i_offset - (optional) offset of the first checkpoint to return -- i_checkpoint_name - (optional) name of the checkpoint +-- Note: i_limit and i_offset are used for pagination purposes; +-- checkpoints are ordered by process_start_time in descending order +-- and then by id_checkpoint in ascending order -- -- Returns: -- status - Status code @@ -77,9 +82,9 @@ BEGIN RETURN QUERY WITH limited_checkpoints AS ( - SELECT DISTINCT C.id_checkpoint, - C.process_start_time, - ROW_NUMBER() OVER (ORDER BY C.process_start_time DESC, C.id_checkpoint) AS rn + SELECT C.id_checkpoint, + C.process_start_time, + ROW_NUMBER() OVER (ORDER BY C.process_start_time DESC, C.id_checkpoint) AS rn FROM runs.checkpoints C WHERE C.fk_partitioning = i_partitioning_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala index 71d0fe57f..3d8ca67c9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala @@ -73,8 +73,8 @@ object CheckpointItemFromDB { } def groupAndConvertItemsToCheckpointV2DTOs( - checkpointItems: Seq[CheckpointItemFromDB] - ): Either[DecodingFailure, Seq[CheckpointV2DTO]] = { + checkpointItems: Seq[CheckpointItemFromDB] + ): Either[DecodingFailure, Seq[CheckpointV2DTO]] = { val groupedItems = checkpointItems.groupBy(_.idCheckpoint) val orderedIds = checkpointItems.map(_.idCheckpoint).distinct From 1823dfc7a9fc54bc3733a7ea48f3c8f45ea36ee8 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 12:04:52 +0200 Subject: [PATCH 11/24] comments addressed --- .../V1.8.3__get_partitioning_checkpoints.sql | 41 ++++++++++--------- .../absa/atum/server/api/http/Endpoints.scala | 6 +-- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 8f11939a3..438099b2c 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -15,7 +15,7 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( IN i_partitioning_id BIGINT, - IN i_limit INT DEFAULT 5, + IN i_checkpoints_limit INT DEFAULT 5, IN i_offset BIGINT DEFAULT 0, IN i_checkpoint_name TEXT DEFAULT NULL, OUT status INTEGER, @@ -40,13 +40,11 @@ RETURNS SETOF record AS -- -- Parameters: -- i_partitioning - partitioning of requested checkpoints --- i_limit - (optional) maximum number of checkpoints to return --- i_limit relates to amount of checkpoints returned, not the amount of rows returned --- as usually there is more than one row per checkpoint +-- i_checkpoints_limit - (optional) maximum number of checkpoints to return -- i_offset - (optional) offset of the first checkpoint to return -- i_checkpoint_name - (optional) name of the checkpoint --- Note: i_limit and i_offset are used for pagination purposes; +-- Note: i_checkpoints_limit and i_offset are used for pagination purposes; -- checkpoints are ordered by process_start_time in descending order -- and then by id_checkpoint in ascending order -- @@ -71,6 +69,8 @@ RETURNS SETOF record AS -- ------------------------------------------------------------------------------- $$ +DECLARE + _has_more BOOLEAN; BEGIN PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; IF NOT FOUND THEN @@ -80,17 +80,19 @@ BEGIN RETURN; END IF; + IF i_checkpoints_limit IS NOT NULL THEN + SELECT count(1) > i_checkpoints_limit + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.process_start_time DESC, C.id_checkpoint + LIMIT i_checkpoints_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + RETURN QUERY - WITH limited_checkpoints AS ( - SELECT C.id_checkpoint, - C.process_start_time, - ROW_NUMBER() OVER (ORDER BY C.process_start_time DESC, C.id_checkpoint) AS rn - FROM runs.checkpoints C - WHERE C.fk_partitioning = i_partitioning_id - AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - ORDER BY C.process_start_time DESC, C.id_checkpoint - LIMIT i_limit + 1 OFFSET i_offset - ) SELECT 11 AS status, 'Ok' AS status_text, @@ -103,7 +105,7 @@ BEGIN M.measurement_value, C.process_start_time AS checkpoint_start_time, C.process_end_time AS checkpoint_end_time, - (SELECT COUNT(*) > i_limit FROM limited_checkpoints) AS has_more + _has_more AS has_more FROM runs.checkpoints C JOIN @@ -111,10 +113,11 @@ BEGIN JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition WHERE - C.id_checkpoint IN (SELECT LC.id_checkpoint FROM limited_checkpoints as LC WHERE LC.rn <= i_limit) + C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) ORDER BY - C.process_start_time, - C.id_checkpoint; + C.process_start_time DESC, C.id_checkpoint + LIMIT i_checkpoints_limit OFFSET i_offset; IF NOT FOUND THEN status := 42; diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 08e76264f..d478d97f3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -24,7 +24,7 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} -import sttp.tapir.{PublicEndpoint, endpoint} +import sttp.tapir.{PublicEndpoint, Validator, endpoint} import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} import java.util.UUID @@ -107,8 +107,8 @@ trait Endpoints extends BaseEndpoints { ], Any] = { apiV2.get .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Checkpoints) - .in(query[Option[Int]]("limit").default(Some(10))) - .in(query[Option[Long]]("offset").default(Some(0L))) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) From d3d98c86c6bd4860c113dbf9f6ebd6cae256293b Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 12:13:38 +0200 Subject: [PATCH 12/24] comments addressed --- ...titioningCheckpointsIntegrationTests.scala | 16 ++++++------- ...itioningCheckpointsEndpointUnitTests.scala | 24 +++++++++++++++++++ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index 98e285e5f..ce354779d 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -47,7 +47,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { |""".stripMargin ) - private val i_limit = 1 + private val i_checkpoints_limit = 1 private val i_offset = 0 private val i_checkpoint_name = "checkpoint_1" @@ -115,7 +115,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", fkPartitioning1) - .setParam("i_limit", i_limit) + .setParam("i_checkpoints_limit", i_checkpoints_limit) .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => @@ -213,7 +213,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", fkPartitioning1) - .setParam("i_limit", 2) + .setParam("i_checkpoints_limit", 2) .setParam("i_offset", i_offset) .execute { queryResult => assert(queryResult.hasNext) @@ -250,7 +250,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", fkPartitioning1) - .setParam("i_limit", 2) + .setParam("i_checkpoints_limit", 2) .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => @@ -274,7 +274,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", fkPartitioning1) - .setParam("i_limit", 2) + .setParam("i_checkpoints_limit", 2) .setParam("i_offset", 1) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => @@ -287,7 +287,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", 0L) - .setParam("i_limit", 2) + .setParam("i_checkpoints_limit", 2) .setParam("i_offset", 1) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => @@ -300,7 +300,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", fkPartitioning1) - .setParam("i_limit", 1) + .setParam("i_checkpoints_limit", 1) .setParam("i_offset", i_offset) .execute { queryResult => assert(queryResult.hasNext) @@ -324,7 +324,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { test("Returns expected status when partitioning not found"){ function(fncGetPartitioningCheckpoints) .setParam("i_partitioning_id", 1) - .setParam("i_limit", i_limit) + .setParam("i_checkpoints_limit", i_checkpoints_limit) .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala index ebd4638c6..12068d416 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala @@ -110,6 +110,30 @@ object GetPartitioningCheckpointsEndpointUnitTests extends ZIOSpecDefault with E val statusCode = response.map(_.code) assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns expected 400 when limit is out of range") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=1001&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + }, + test("Returns expected 400 when offset is negative") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=10&offset=-1") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) } ) }.provide( From 971630194414aa7fdd3dfbb36f348623089c4f59 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 12:21:47 +0200 Subject: [PATCH 13/24] comments addressed --- .../main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 438099b2c..a2039a9e7 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -85,6 +85,7 @@ BEGIN FROM runs.checkpoints C WHERE C.fk_partitioning = i_partitioning_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + GROUP BY C.process_start_time, C.id_checkpoint ORDER BY C.process_start_time DESC, C.id_checkpoint LIMIT i_checkpoints_limit + 1 OFFSET i_offset INTO _has_more; From 0f15e1ce1e58e05490e8d1cee9977ae8b9862d93 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 14:26:06 +0200 Subject: [PATCH 14/24] comments addressed --- .../V1.8.3__get_partitioning_checkpoints.sql | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index a2039a9e7..f41b02372 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -81,44 +81,50 @@ BEGIN END IF; IF i_checkpoints_limit IS NOT NULL THEN - SELECT count(1) > i_checkpoints_limit + SELECT count(*) > i_checkpoints_limit FROM runs.checkpoints C WHERE C.fk_partitioning = i_partitioning_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - GROUP BY C.process_start_time, C.id_checkpoint - ORDER BY C.process_start_time DESC, C.id_checkpoint - LIMIT i_checkpoints_limit + 1 OFFSET i_offset INTO _has_more; ELSE _has_more := false; END IF; RETURN QUERY + WITH limited_checkpoints AS ( + SELECT C.id_checkpoint, + C.checkpoint_name, + C.created_by, + C.measured_by_atum_agent, + C.process_start_time, + C.process_end_time + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.process_start_time DESC, C.id_checkpoint + LIMIT i_checkpoints_limit OFFSET i_offset + ) SELECT 11 AS status, 'Ok' AS status_text, - C.id_checkpoint, - C.checkpoint_name, - C.created_by AS author, - C.measured_by_atum_agent, + LC.id_checkpoint, + LC.checkpoint_name, + LC.created_by AS author, + LC.measured_by_atum_agent, md.measure_name, md.measured_columns, M.measurement_value, - C.process_start_time AS checkpoint_start_time, - C.process_end_time AS checkpoint_end_time, + LC.process_start_time AS checkpoint_start_time, + LC.process_end_time AS checkpoint_end_time, _has_more AS has_more FROM - runs.checkpoints C + limited_checkpoints LC JOIN - runs.measurements M ON C.id_checkpoint = M.fk_checkpoint + runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition - WHERE - C.fk_partitioning = i_partitioning_id - AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) ORDER BY - C.process_start_time DESC, C.id_checkpoint - LIMIT i_checkpoints_limit OFFSET i_offset; + LC.process_start_time, LC.id_checkpoint; IF NOT FOUND THEN status := 42; From 2bf339e8d02ac2d4b6718586c438cc2f483e760b Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 14:31:37 +0200 Subject: [PATCH 15/24] comments addressed --- .../server/api/repository/PartitioningRepositoryUnitTests.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 5975dca72..36c505f8d 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -256,7 +256,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMeasuresMockLayer, getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, -// getPartitioningCheckpointsMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer, getPartitioningMeasuresV2MockLayer From dd3b7e2a785960376f8a9614d177638117e668f5 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 14:46:42 +0200 Subject: [PATCH 16/24] fix --- .../main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index f41b02372..68a376de5 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -85,6 +85,7 @@ BEGIN FROM runs.checkpoints C WHERE C.fk_partitioning = i_partitioning_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + LIMIT i_checkpoints_limit + 1 OFFSET i_offset INTO _has_more; ELSE _has_more := false; From 611ea75c5c756a086f60188253a0d30408b5d9d3 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 17 Sep 2024 13:24:45 +0200 Subject: [PATCH 17/24] conflicts with master resolved --- .../api/controller/PartitioningControllerImpl.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index cdb457e8c..6bac1662a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -50,16 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) atumContextDTOEffect } - override def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { - mapToMultiSuccessResponse( - serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( - partitioningService.getPartitioningCheckpoints(checkpointQueryDTO) - ) - ) - } - override def getPartitioningAdditionalDataV2( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = { From 7811f3771281118178991b96580a2c8001c4ea80 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 18 Sep 2024 14:02:32 +0200 Subject: [PATCH 18/24] comments addressed --- .../runs/V1.8.3__get_partitioning_checkpoints.sql | 14 +++++++------- ...tPartitioningCheckpointsEndpointUnitTests.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 68a376de5..d7811ca24 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -64,8 +64,8 @@ RETURNS SETOF record AS -- -- Status codes: -- 11 - OK +-- 12 - OK with no checkpoints found -- 41 - Partitioning not found --- 42 - No checkpoint data found -- ------------------------------------------------------------------------------- $$ @@ -102,7 +102,7 @@ BEGIN FROM runs.checkpoints C WHERE C.fk_partitioning = i_partitioning_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - ORDER BY C.process_start_time DESC, C.id_checkpoint + ORDER BY C.id_checkpoint, C.process_start_time LIMIT i_checkpoints_limit OFFSET i_offset ) SELECT @@ -120,16 +120,16 @@ BEGIN _has_more AS has_more FROM limited_checkpoints LC - JOIN + INNER JOIN runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint - JOIN + INNER JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition ORDER BY - LC.process_start_time, LC.id_checkpoint; + LC.id_checkpoint, LC.process_start_time; IF NOT FOUND THEN - status := 42; - status_text := 'No checkpoint data found'; + status := 12; + status_text := 'OK with no checkpoints found'; RETURN NEXT; END IF; END; diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala index 12068d416..dd9fc89c7 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala @@ -45,7 +45,7 @@ object GetPartitioningCheckpointsEndpointUnitTests extends ZIOSpecDefault with E when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(20), Some(0), None)) .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid))) when(checkpointControllerMock.getPartitioningCheckpoints(2L, Some(10), Some(0), None)) - .thenReturn(ZIO.fail(NotFoundErrorResponse("not found checkpoint data for a given ID"))) + .thenReturn(ZIO.fail(NotFoundErrorResponse("partitioning not found"))) private val checkpointControllerMockLayer = ZLayer.succeed(checkpointControllerMock) From c33a136231583623ead194b9c98fec9f70945465 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 18 Sep 2024 14:14:11 +0200 Subject: [PATCH 19/24] tests fixed --- ...titioningCheckpointsIntegrationTests.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index ce354779d..53bed6461 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -220,29 +220,28 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { val result1 = queryResult.next() assert(result1.getInt("status").contains(11)) assert(result1.getString("status_text").contains("Ok")) - assert(result1.getUUID("id_checkpoint").contains(uuid1)) - assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getUUID("id_checkpoint").contains(uuid2)) + assert(result1.getString("checkpoint_name").contains("checkpoint_2")) assert(result1.getString("author").contains("Daniel")) assert(result1.getBoolean("measured_by_atum_agent").contains(true)) - assert(result1.getString("measure_name").contains("measure_1")) - assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) - assert(result1.getJsonB("measurement_value").contains(measurement1)) - assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getString("measure_name").contains("measure_2")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result1.getJsonB("measurement_value").contains(measurement2)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) assert(result1.getBoolean("has_more").contains(false)) - assert(queryResult.hasNext) val result2 = queryResult.next() assert(result2.getInt("status").contains(11)) assert(result2.getString("status_text").contains("Ok")) - assert(result2.getUUID("id_checkpoint").contains(uuid2)) - assert(result2.getString("checkpoint_name").contains("checkpoint_2")) + assert(result2.getUUID("id_checkpoint").contains(uuid1)) + assert(result2.getString("checkpoint_name").contains("checkpoint_1")) assert(result2.getString("author").contains("Daniel")) assert(result2.getBoolean("measured_by_atum_agent").contains(true)) - assert(result2.getString("measure_name").contains("measure_2")) - assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) - assert(result2.getJsonB("measurement_value").contains(measurement2)) - assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result2.getString("measure_name").contains("measure_1")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result2.getJsonB("measurement_value").contains(measurement1)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) assert(result2.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) @@ -280,8 +279,8 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { .execute { queryResult => assert(queryResult.hasNext) val result = queryResult.next() - assert(result.getInt("status").contains(42)) - assert(result.getString("status_text").contains("No checkpoint data found")) + assert(result.getInt("status").contains(12)) + assert(result.getString("status_text").contains("OK with no checkpoints found")) assert(!queryResult.hasNext) } From 4e0f6bac4ec31137e58501e2d77d20028cd1c50d Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 18 Sep 2024 17:02:07 +0200 Subject: [PATCH 20/24] tests deterministic --- .../runs/GetPartitioningCheckpointsIntegrationTests.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index 53bed6461..59bfac6c8 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -60,8 +60,8 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { private val id_measure_definition1: Long = 1 private val id_measure_definition2: Long = 2 - private val uuid1 = UUID.randomUUID - private val uuid2 = UUID.randomUUID + private val uuid1 = UUID.fromString("d56fa5e2-79af-4a08-8b0c-6f83ff12cb2c") + private val uuid2 = UUID.fromString("6e42d61e-5cfa-45c1-9d0d-e1f3120107da") private val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") private val startTime2 = OffsetDateTime.parse("1993-08-03T10:00:00Z") private val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") From e187745c676b81e25ad449bee8f61cce4790a4bb Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 18 Sep 2024 22:13:14 +0200 Subject: [PATCH 21/24] jacoco report action 1.7.1 --- .github/workflows/jacoco_report.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/jacoco_report.yml b/.github/workflows/jacoco_report.yml index e726ee807..ff49512e5 100644 --- a/.github/workflows/jacoco_report.yml +++ b/.github/workflows/jacoco_report.yml @@ -63,7 +63,7 @@ jobs: - name: Add coverage to PR (model) if: steps.jacocorun.outcome == 'success' id: jacoco-model - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -74,7 +74,7 @@ jobs: - name: Add coverage to PR (agent) if: steps.jacocorun.outcome == 'success' id: jacoco-agent - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/agent/target/spark3-jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -85,7 +85,7 @@ jobs: - name: Add coverage to PR (reader) if: steps.jacocorun.outcome == 'success' id: jacoco-reader - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/reader/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -96,7 +96,7 @@ jobs: - name: Add coverage to PR (server) if: steps.jacocorun.outcome == 'success' id: jacoco-server - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/server/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} From 92a20990948915ec1a35a958570ab7a274c9c633 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 18 Sep 2024 22:19:10 +0200 Subject: [PATCH 22/24] test --- .../absa/atum/server/model/PaginatedResult.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala index 4f7887852..fd0a9f539 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala @@ -26,15 +26,15 @@ sealed trait PaginatedResult[R] { object PaginatedResult { case class ResultHasMore[R](data: Seq[R]) extends PaginatedResult[R] - object ResultHasMore { - implicit def encoder[T: Encoder]: Encoder[ResultHasMore[T]] = deriveEncoder - implicit def decoder[T: Decoder]: Decoder[ResultHasMore[T]] = deriveDecoder - } +// object ResultHasMore { +// implicit def encoder[T: Encoder]: Encoder[ResultHasMore[T]] = deriveEncoder +// implicit def decoder[T: Decoder]: Decoder[ResultHasMore[T]] = deriveDecoder +// } case class ResultNoMore[R](data: Seq[R]) extends PaginatedResult[R] - object ResultNoMore { - implicit def encoder[T: Encoder]: Encoder[ResultNoMore[T]] = deriveEncoder - implicit def decoder[T: Decoder]: Decoder[ResultNoMore[T]] = deriveDecoder - } +// object ResultNoMore { +// implicit def encoder[T: Encoder]: Encoder[ResultNoMore[T]] = deriveEncoder +// implicit def decoder[T: Decoder]: Decoder[ResultNoMore[T]] = deriveDecoder +// } } From 1e14e10a309acf42e11c6dd31f1cf5dc8840ea30 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 18 Sep 2024 22:24:12 +0200 Subject: [PATCH 23/24] remove serde in paginated result --- .../co/absa/atum/server/model/PaginatedResult.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala index fd0a9f539..e3a2898e4 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala @@ -16,9 +16,6 @@ package za.co.absa.atum.server.model -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - sealed trait PaginatedResult[R] { def data: Seq[R] } @@ -26,15 +23,6 @@ sealed trait PaginatedResult[R] { object PaginatedResult { case class ResultHasMore[R](data: Seq[R]) extends PaginatedResult[R] -// object ResultHasMore { -// implicit def encoder[T: Encoder]: Encoder[ResultHasMore[T]] = deriveEncoder -// implicit def decoder[T: Decoder]: Decoder[ResultHasMore[T]] = deriveDecoder -// } - case class ResultNoMore[R](data: Seq[R]) extends PaginatedResult[R] -// object ResultNoMore { -// implicit def encoder[T: Encoder]: Encoder[ResultNoMore[T]] = deriveEncoder -// implicit def decoder[T: Decoder]: Decoder[ResultNoMore[T]] = deriveDecoder -// } } From 8811b4eeaef6f1823b55f18484de575e97118ee0 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 23 Sep 2024 12:03:16 +0200 Subject: [PATCH 24/24] no status when no results --- .../postgres/runs/V1.8.3__get_partitioning_checkpoints.sql | 7 ------- .../runs/GetPartitioningCheckpointsIntegrationTests.scala | 4 ---- 2 files changed, 11 deletions(-) diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index d7811ca24..ed2f466d0 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -64,7 +64,6 @@ RETURNS SETOF record AS -- -- Status codes: -- 11 - OK --- 12 - OK with no checkpoints found -- 41 - Partitioning not found -- ------------------------------------------------------------------------------- @@ -126,12 +125,6 @@ BEGIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition ORDER BY LC.id_checkpoint, LC.process_start_time; - - IF NOT FOUND THEN - status := 12; - status_text := 'OK with no checkpoints found'; - RETURN NEXT; - END IF; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index 59bfac6c8..21d623126 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -277,10 +277,6 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { .setParam("i_offset", 1) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => - assert(queryResult.hasNext) - val result = queryResult.next() - assert(result.getInt("status").contains(12)) - assert(result.getString("status_text").contains("OK with no checkpoints found")) assert(!queryResult.hasNext) }