From a1e822c5b7eb42d6fefc8e3d24799c31031e72be Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 10 Sep 2024 12:22:21 +0200 Subject: [PATCH 01/18] endpoint --- .../co/absa/atum/server/api/http/Endpoints.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index b0f7e507e..34332d247 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -23,7 +23,7 @@ import sttp.tapir.json.circe.jsonBody import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import sttp.tapir.{PublicEndpoint, endpoint} import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} @@ -137,6 +137,19 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } + protected val getFlowPartitioningsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[ + PartitioningWithIdDTO + ], Any] = { + apiV2.get + .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Partitionings) + .in(query[Option[Int]]("limit").default(Some(10))) + .in(query[Option[Long]]("offset").default(Some(0))) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } From 649f969eb17d5526a302105b585cc942337359d5 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 10 Sep 2024 14:19:57 +0200 Subject: [PATCH 02/18] get_flow_partitionings --- .../flows/V1.9.8__get_flow_partitionings.sql | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql new file mode 100644 index 000000000..4885940b2 --- /dev/null +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -0,0 +1,100 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( + IN i_flow_id BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT id BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ +------------------------------------------------------------------------------- +-- +-- Function: flows.get_flow_partitionings(3) +-- Retrieves all partitionings associated with the input flow. +-- +-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table +-- +-- Parameters: +-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- id - ID of retrieved partitioning +-- partitioning - Partitioning value +-- author - Author of the partitioning +-- has_more - Flag indicating if there are more partitionings available +-- +-- Status codes: +-- 11 - OK +-- 41 - Flow not found +-- 42 - Partitionings not found +-- +------------------------------------------------------------------------------- +BEGIN + -- Check if the flow exists in runs.flows table + PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Flow not found'; + RETURN NEXT; + RETURN; + END IF; + + RETURN QUERY + WITH limited_partitionings AS ( + SELECT P.id_partitioning, + P.created_at, + ROW_NUMBER() OVER (ORDER BY P.created_at DESC, P.id_partitioning) AS rn + FROM flows.partitioning_to_flow PF + JOIN runs.partitionings P ON PF.fk_partitioning = P.id_partitioning + WHERE PF.fk_flow = i_flow_id + ORDER BY P.created_at DESC, P.id_partitioning + LIMIT i_limit + 1 OFFSET i_offset + ) + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning AS id, + P.partitioning, + P.created_by AS author, + (SELECT COUNT(*) > i_limit FROM limited_partitionings) AS has_more + FROM + runs.partitionings P + WHERE + P.id_partitioning IN (SELECT LP.id_partitioning FROM limited_partitionings LP WHERE LP.rn <= i_limit) + ORDER BY + P.created_at DESC, + P.id_partitioning; + + IF NOT FOUND THEN + status := 42; + status_text := 'Partitionings not found'; + RETURN NEXT; + END IF; +END; +$$ +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner; From 165c3dd892b57222d46b9d93967e70151e4b09e3 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 10 Sep 2024 15:33:17 +0200 Subject: [PATCH 03/18] test unfinished --- ...GetFlowPartitioningsIntegrationTests.scala | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala new file mode 100644 index 000000000..fe71d6d7d --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.flows + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +class GetFlowPartitioningsIntegrationTests extends DBTestSuite { + + private val getFlowPartitioningsFn = "flows.get_flow_partitionings" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValues": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValues": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + // insert partitionings to runs.partitionings and flows.partitioning_to_flow + +} From ad08738596a8a508bb068507c889b955ffb2a899 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 11 Sep 2024 14:29:24 +0200 Subject: [PATCH 04/18] test sql --- .../flows/V1.9.8__get_flow_partitionings.sql | 2 +- ...GetFlowPartitioningsIntegrationTests.scala | 166 +++++++++++++++++- 2 files changed, 166 insertions(+), 2 deletions(-) diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql index 4885940b2..7f27b7885 100644 --- a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -68,7 +68,7 @@ BEGIN P.created_at, ROW_NUMBER() OVER (ORDER BY P.created_at DESC, P.id_partitioning) AS rn FROM flows.partitioning_to_flow PF - JOIN runs.partitionings P ON PF.fk_partitioning = P.id_partitioning + JOIN runs.partitionings P ON PF.fk_partitioning = P.id_partitioning WHERE PF.fk_flow = i_flow_id ORDER BY P.created_at DESC, P.id_partitioning LIMIT i_limit + 1 OFFSET i_offset diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala index fe71d6d7d..f21f2fef1 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala @@ -16,12 +16,18 @@ package za.co.absa.atum.database.flows +import io.circe.Json +import io.circe.parser.parse import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString class GetFlowPartitioningsIntegrationTests extends DBTestSuite { private val getFlowPartitioningsFn = "flows.get_flow_partitionings" + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioningsTable = "runs.partitionings" private val partitioning1 = JsonBString( """ @@ -37,6 +43,19 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { |""".stripMargin ) + private val partitioning1Parent = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB"], + | "keysToValues": { + | "keyA": "valueA", + | "keyB": "valueB" + | } + |} + |""".stripMargin + ) + private val partitioning2 = JsonBString( """ |{ @@ -51,6 +70,151 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { |""".stripMargin ) - // insert partitionings to runs.partitionings and flows.partitioning_to_flow + private val partitioning3 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyG", "keyH", "keyI"], + | "keysToValues": { + | "keyG": "valueG", + | "keyH": "valueH", + | "keyI": "valueI" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfParentPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns partitioning(s) for a given flow") { + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning1Parent).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Joseph")) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId1Parent: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1Parent, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + val partId3: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning3, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1Parent) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfParentPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId3) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1Parent) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + // there is a bug in flows._add_to_parent_flows, it never sets id_flow therefore returning always NULL + // assert(result1.getLong("id_flow").get == flowIdOfPartitioning1) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfPartitioning1) + .setParam("i_limit", 1) + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson == returnedPartitioningJson) + assert(!result1.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfParentPartitioning1) + .setParam("i_limit", 1) // limit is set to 1, so only one partitioning should be returned and more data available + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson1 == returnedPartitioningJson1) + assert(result1.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfParentPartitioning1) + .setParam("i_limit", 2) // limit is set to 2, so both partitionings should be returned and no more data available + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson1 == returnedPartitioningJson1) + assert(!result1.getBoolean("has_more").get) + assert(queryResult.hasNext) + assert(queryResult.hasNext) + val result2 = queryResult.next() + assert(result2.getLong("id").get == partId1Parent) + val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning1Parent) + val returnedPartitioningJson2 = parseJsonBStringOrThrow(result2.getJsonB("partitioning").get) + assert(expectedPartitioningJson2 == returnedPartitioningJson2) + assert(!result2.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + } + + test("Fails for non-existent flow"){ + function(getFlowPartitioningsFn) + .setParam("i_flow_id", 999999) + .setParam("i_limit", 1) + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 41) + assert(result1.getString("status_text").get == "Flow not found") + assert(!queryResult.hasNext) + } + } + + private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = { + parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json")) + } } From 3fb80b6ba72c6be2cc07f8b09327b4373caad414 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 11 Sep 2024 14:32:58 +0200 Subject: [PATCH 05/18] test sql --- ...GetFlowPartitioningsIntegrationTests.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala index f21f2fef1..062776b12 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala @@ -70,20 +70,6 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { |""".stripMargin ) - private val partitioning3 = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["keyG", "keyH", "keyI"], - | "keysToValues": { - | "keyG": "valueG", - | "keyH": "valueH", - | "keyI": "valueI" - | } - |} - |""".stripMargin - ) - var flowIdOfPartitioning1: Long = _ var flowIdOfParentPartitioning1: Long = _ var flowIdOfPartitioning2: Long = _ @@ -93,7 +79,6 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Joseph")) table(partitioningsTable).insert(add("partitioning", partitioning1Parent).add("created_by", "Joseph")) table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Joseph")) - table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Joseph")) val partId1: Long = table(partitioningsTable) .fieldValue("partitioning", partitioning1, "id_partitioning").get.get @@ -104,9 +89,6 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { val partId2: Long = table(partitioningsTable) .fieldValue("partitioning", partitioning2, "id_partitioning").get.get - val partId3: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning3, "id_partitioning").get.get - function(createFlowFn) .setParam("i_fk_partitioning", partId1) .setParam("i_by_user", "Joseph") @@ -128,13 +110,6 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get } - function(createFlowFn) - .setParam("i_fk_partitioning", partId3) - .setParam("i_by_user", "Joseph") - .execute { queryResult => - flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get - } - function(addToParentFlowsFn) .setParam("i_fk_parent_partitioning", partId1Parent) .setParam("i_fk_partitioning", partId1) From b6e40380f4e65708fa0e0bf1dc3c9b1553811135 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 11 Sep 2024 14:34:16 +0200 Subject: [PATCH 06/18] test sql --- .../database/flows/GetFlowPartitioningsIntegrationTests.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala index 062776b12..904cd42b9 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala @@ -118,8 +118,6 @@ class GetFlowPartitioningsIntegrationTests extends DBTestSuite { val result1 = queryResult.next() assert(result1.getInt("status").get == 11) assert(result1.getString("status_text").get == "Partitioning added to flows") - // there is a bug in flows._add_to_parent_flows, it never sets id_flow therefore returning always NULL - // assert(result1.getLong("id_flow").get == flowIdOfPartitioning1) } function(getFlowPartitioningsFn) From dcdc93ed71ebbffabb58c23cdb3b932b5736481c Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 11 Sep 2024 15:14:48 +0200 Subject: [PATCH 07/18] _add_to_parent_flows fix --- .../src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql b/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql index a117a323a..d424ebf9f 100644 --- a/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql +++ b/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql @@ -19,8 +19,7 @@ CREATE OR REPLACE FUNCTION flows._add_to_parent_flows( IN i_fk_partitioning BIGINT, IN i_by_user TEXT, OUT status INTEGER, - OUT status_text TEXT, - OUT id_flow BIGINT + OUT status_text TEXT ) RETURNS record AS $$ ------------------------------------------------------------------------------- From bb2cbf9f21e0ed486ae7bee69dc19f01d09b125e Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 12 Sep 2024 09:27:15 +0200 Subject: [PATCH 08/18] tmp --- .../controller/PartitioningController.scala | 1 - .../functions/GetFlowPartitionings.scala | 59 +++++++++++++++++++ .../repository/PartitioningRepository.scala | 9 ++- .../PartitioningRepositoryImpl.scala | 37 ++++++++++-- 4 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 12f505c35..1ced52210 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -47,7 +47,6 @@ trait PartitioningController { def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] - def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala new file mode 100644 index 000000000..eea409907 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import doobie.implicits.toSqlInterpolator +import io.circe.Json +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.Flows +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet + +class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, GetFlowPartitioningsResult, Task]( + args => + Seq( + fr"${args.flowId}", + fr"${args.limit}", + fr"${args.offset}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("id", "partitioning", "author", "has_more") +} + +object GetFlowPartitionings { + case class GetFlowPartitioningsArgs(flowId: Long, limit: Option[Int], offset: Option[Long]) + case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean) { + def toPartitioningWithIdDTO: PartitioningWithIdDTO = PartitioningWithIdDTO(id, PartitioningDTO.fromPartitioningJson(partitioningJson), author) + } + + val layer: URLayer[PostgresDatabaseProvider, GetFlowPartitionings] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetFlowPartitionings()(Flows, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 14dbfc387..66f7f28fc 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.atum.server.model.{CheckpointFromDB, PaginatedResult} import zio.IO import zio.macros.accessible @@ -43,6 +43,11 @@ trait PartitioningRepository { def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] - def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 3a275a363..3e27cc774 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -17,13 +17,16 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB, PartitioningFromDB} +import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB, PaginatedResult, PartitioningFromDB} import zio._ import zio.interop.catz.asyncInstance import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} class PartitioningRepositoryImpl( createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists, @@ -33,7 +36,8 @@ class PartitioningRepositoryImpl( getPartitioningCheckpointsFn: GetPartitioningCheckpoints, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2, - getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById + getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById, + getFlowPartitioningsFn: GetFlowPartitionings ) extends PartitioningRepository with BaseRepository { @@ -99,7 +103,6 @@ class PartitioningRepositoryImpl( } } - override def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] = { dbMultipleResultCallWithAggregatedStatus(getPartitioningMeasuresByIdFn(partitioningId), "getPartitioningMeasures") .map(_.map { case MeasureFromDB(measureName, measuredColumns) => @@ -107,6 +110,27 @@ class PartitioningRepositoryImpl( }) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[RuntimeFlags], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getFlowPartitioningsFn(GetFlowPartitioningsArgs(flowId, limit, offset)), + "getFlowPartitionings" + ) + .map(x => _.flatten) + .flatMap { checkpointItems => +// ZIO +// .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) +// .mapBoth( +// error => GeneralDatabaseError(error.getMessage), +// checkpoints => +// if (checkpointItems.nonEmpty && checkpointItems.head.hasMore) ResultHasMore(checkpoints) +// else ResultNoMore(checkpoints) +// ) +// } + } } object PartitioningRepositoryImpl { @@ -118,7 +142,8 @@ object PartitioningRepositoryImpl { with GetPartitioningCheckpoints with GetPartitioningAdditionalDataV2 with GetPartitioningById - with GetPartitioningMeasuresById, + with GetPartitioningMeasuresById + with GetFlowPartitionings, PartitioningRepository ] = ZLayer { for { @@ -130,6 +155,7 @@ object PartitioningRepositoryImpl { getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] + getFlowPartitionings <- ZIO.service[GetFlowPartitionings] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, getPartitioningMeasures, @@ -138,7 +164,8 @@ object PartitioningRepositoryImpl { getPartitioningCheckpoints, getPartitioningById, getPartitioningAdditionalDataV2, - getPartitioningMeasuresV2 + getPartitioningMeasuresV2, + getFlowPartitionings ) } } From 44ab165aae6e5cb2c9840310f54f11f955bf5c8c Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 12 Sep 2024 12:42:53 +0200 Subject: [PATCH 09/18] implementation without tests --- .../scala/za/co/absa/atum/server/Main.scala | 3 +- .../api/controller/BaseController.scala | 12 +++++ .../controller/PartitioningController.scala | 8 +++- .../PartitioningControllerImpl.scala | 18 +++++++- .../functions/GetFlowPartitionings.scala | 44 ++++++++++++++----- .../PartitioningRepositoryImpl.scala | 40 ++++++++++------- .../api/service/PartitioningService.scala | 7 +++ .../api/service/PartitioningServiceImpl.scala | 12 ++++- .../PartitioningRepositoryUnitTests.scala | 7 ++- 9 files changed, 119 insertions(+), 32 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 01360ea69..89dec80c7 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server import za.co.absa.atum.server.api.controller._ -import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints +import za.co.absa.atum.server.api.database.flows.functions.{GetFlowCheckpoints, GetFlowPartitionings} import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.http.Server @@ -62,6 +62,7 @@ object Main extends ZIOAppDefault with Server { GetPartitioningCheckpointV2.layer, GetFlowCheckpoints.layer, GetPartitioningById.layer, + GetFlowPartitionings.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 5e88951ae..fd0e9720f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.http.ApiPaths +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse._ import za.co.absa.atum.server.model._ import zio._ @@ -54,6 +55,17 @@ trait BaseController { effect.map(MultiSuccessResponse(_)) } + protected def mapToPaginatedResponse[A]( + limit: Int, + offset: Long, + effect: IO[ErrorResponse, PaginatedResult[A]] + ): IO[ErrorResponse, PaginatedResponse[A]] = { + effect.map { + case ResultHasMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = true)) + case ResultNoMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = false)) + } + } + // Root-anchored URL path // https://stackoverflow.com/questions/2005079/absolute-vs-relative-urls/78439286#78439286 protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 1ced52210..a53a9c23c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -50,4 +50,10 @@ trait PartitioningController { def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 998d422b3..a8d2f9ed5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -19,8 +19,8 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import zio._ class PartitioningControllerImpl(partitioningService: PartitioningService) @@ -105,6 +105,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]]( + partitioningService.getFlowPartitionings(flowId, limit, offset) + ) + ) + } + } object PartitioningControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala index eea409907..be56853dc 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala @@ -17,8 +17,8 @@ package za.co.absa.atum.server.api.database.flows.functions import doobie.implicits.toSqlInterpolator -import io.circe.Json -import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{PartitioningDTO, PartitioningWithIdDTO} import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.flows.Flows import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ @@ -30,14 +30,17 @@ import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio.{Task, URLayer, ZIO, ZLayer} import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet +import scala.annotation.tailrec + class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, GetFlowPartitioningsResult, Task]( - args => - Seq( - fr"${args.flowId}", - fr"${args.limit}", - fr"${args.offset}" - ) + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, Option[ + GetFlowPartitioningsResult + ], Task](args => + Seq( + fr"${args.flowId}", + fr"${args.limit}", + fr"${args.offset}" + ) ) with StandardStatusHandling with ByFirstErrorStatusAggregator { @@ -47,8 +50,27 @@ class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Tas object GetFlowPartitionings { case class GetFlowPartitioningsArgs(flowId: Long, limit: Option[Int], offset: Option[Long]) - case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean) { - def toPartitioningWithIdDTO: PartitioningWithIdDTO = PartitioningWithIdDTO(id, PartitioningDTO.fromPartitioningJson(partitioningJson), author) + case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean) + + object GetFlowPartitioningsResult { + + @tailrec def resultsToPartitioningWithIdDTOs( + results: Seq[GetFlowPartitioningsResult], + acc: Seq[PartitioningWithIdDTO] + ): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = { + if (results.isEmpty) Right(acc) + else { + val head = results.head + val tail = results.tail + val decodingResult = head.partitioningJson.as[PartitioningDTO] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningDTO) => + resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.id, partitioningDTO, head.author)) + } + } + } + } val layer: URLayer[PostgresDatabaseProvider, GetFlowPartitionings] = ZLayer { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 3e27cc774..978e470db 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -18,11 +18,21 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings -import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.{ + GetFlowPartitioningsArgs, + GetFlowPartitioningsResult +} import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB, PaginatedResult, PartitioningFromDB} +import za.co.absa.atum.server.model.{ + AdditionalDataFromDB, + AdditionalDataItemFromDB, + CheckpointFromDB, + MeasureFromDB, + PaginatedResult, + PartitioningFromDB +} import zio._ import zio.interop.catz.asyncInstance import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError @@ -112,24 +122,24 @@ class PartitioningRepositoryImpl( override def getFlowPartitionings( flowId: Long, - limit: Option[RuntimeFlags], + limit: Option[Int], offset: Option[Long] ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { dbMultipleResultCallWithAggregatedStatus( getFlowPartitioningsFn(GetFlowPartitioningsArgs(flowId, limit, offset)), "getFlowPartitionings" - ) - .map(x => _.flatten) - .flatMap { checkpointItems => -// ZIO -// .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) -// .mapBoth( -// error => GeneralDatabaseError(error.getMessage), -// checkpoints => -// if (checkpointItems.nonEmpty && checkpointItems.head.hasMore) ResultHasMore(checkpoints) -// else ResultNoMore(checkpoints) -// ) -// } + ).map(_.flatten) + .flatMap { partitioningResults => + ZIO + .fromEither(GetFlowPartitioningsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + partitionings => { + if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings) + else ResultNoMore(partitionings) + } + ) + } } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 212bdeb89..c2f4efabf 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -41,4 +42,10 @@ trait PartitioningService { def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index fc3d55e81..627938948 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -20,7 +20,7 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.atum.server.model.{CheckpointFromDB, PaginatedResult} import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -95,6 +95,16 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = { + repositoryCall( + partitioningRepository.getFlowPartitionings(flowId, limit, offset), + "getFlowPartitionings" + ) + } } object PartitioningServiceImpl { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 4e2bec8e1..72bd35a39 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO} import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -126,6 +127,9 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresV2MockLayer = ZLayer.succeed(getPartitioningMeasuresV2Mock) + private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) + private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) + override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -286,7 +290,8 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningCheckpointsMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer, - getPartitioningMeasuresV2MockLayer + getPartitioningMeasuresV2MockLayer, + getFlowPartitioningsMockLayer ) } From 6a2be40254258b213429c4aaa07cc012803de0bf Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 12 Sep 2024 13:55:48 +0200 Subject: [PATCH 10/18] GetFlowPartitioningsIntegrationTests --- ...GetFlowPartitioningsIntegrationTests.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala new file mode 100644 index 000000000..3760c62b4 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.{Scope, ZIO} +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.interop.catz.asyncInstance + +object GetFlowPartitioningsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[Unit with TestEnvironment with Scope, Any] = { + suite("GetFlowPartitioningsIntegrationTests")( + test("Returns expected DataNotFoundException when flow not found") { + for { + getFlowPartitionings <- ZIO.service[GetFlowPartitionings] + result <- getFlowPartitionings(GetFlowPartitioningsArgs(0L, None, None)) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) + } + ) + }.provide( + GetFlowPartitionings.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + +} From 10ff0b672df14487d690055b2bf598dd602251b3 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 12 Sep 2024 22:15:47 +0200 Subject: [PATCH 11/18] repository tests --- .../co/absa/atum/server/api/http/Routes.scala | 17 +++++++-- .../za/co/absa/atum/server/api/TestData.scala | 15 ++++++++ .../PartitioningRepositoryUnitTests.scala | 36 ++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 01e1ed6ec..aa09ca9d6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -25,13 +25,13 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -86,6 +86,16 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), + createServerEndpoint[ + (Long, Option[Int], Option[Long]), + ErrorResponse, + PaginatedResponse[PartitioningWithIdDTO] + ]( + getFlowPartitioningsEndpointV2, + { case (flowId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getFlowPartitionings(flowId, limit, offset) + } + ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes @@ -104,7 +114,8 @@ trait Routes extends Endpoints with ServerOptions { getPartitioningCheckpointsEndpointV2, getPartitioningCheckpointEndpointV2, getFlowCheckpointsEndpointV2, - getPartitioningMeasuresEndpointV2 + getPartitioningMeasuresEndpointV2, + getFlowPartitioningsEndpointV2 ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index d6c3995ad..7d27585a9 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -25,6 +25,7 @@ import java.util.UUID import MeasureResultDTO.TypedValue import io.circe.syntax.EncoderOps import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult trait TestData { @@ -75,6 +76,20 @@ trait TestData { author = "author" ) + protected val getFlowPartitioningsResult1: GetFlowPartitioningsResult = GetFlowPartitioningsResult( + id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + + protected val getFlowPartitioningsResult2: GetFlowPartitioningsResult = GetFlowPartitioningsResult( + id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = true + ) + // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 72bd35a39..2fd52f849 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -20,17 +20,19 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.db.fadb.exceptions.{DataNotFoundException, ErrorInDataException} import za.co.absa.db.fadb.status.{FunctionStatus, Row} import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB} +import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, PaginatedResult} object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -128,6 +130,16 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresV2MockLayer = ZLayer.succeed(getPartitioningMeasuresV2Mock) private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) + + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(1L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(2L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult2))))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(0L, None, None))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(3L, Some(10), Some(0))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) @@ -280,6 +292,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralDatabaseError] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getFlowPartitionings(1L, Some(10), Some(0)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getFlowPartitionings(2L, Some(10), Some(0)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getFlowPartitionings(0L, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getFlowPartitionings(3L, Some(10), Some(0)).exit)( + failsWithA[GeneralDatabaseError] + ) + } ) ).provide( PartitioningRepositoryImpl.layer, From aa5c39fc2bcfa71594c83f3378f98afe3f1d78e6 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 13 Sep 2024 10:49:16 +0200 Subject: [PATCH 12/18] service and controller tests --- .../PartitioningControllerUnitTests.scala | 40 +++++++++++++++++- .../PartitioningServiceUnitTests.scala | 41 ++++++++++++++++--- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 3baa18639..a3b17b2b1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -21,8 +21,9 @@ import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError.{GeneralServiceError, NotFoundServiceError} import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -69,6 +70,15 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioning(99L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getFlowPartitionings(1L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getFlowPartitionings(2L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getFlowPartitionings(3L, Some(1), Some(0))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getFlowPartitionings(4L, Some(1), Some(0))) + .thenReturn(ZIO.fail(NotFoundServiceError("Flow not found"))) + private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -158,6 +168,32 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundErrorResponse] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with more data available") { + for { + result <- PartitioningController.getFlowPartitionings(1L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = true), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with no more data available") { + for { + result <- PartitioningController.getFlowPartitionings(2L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = false), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected InternalServerErrorResponse when service call fails with GeneralServiceError") { + assertZIO(PartitioningController.getFlowPartitionings(3L, Some(1), Some(0)).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + test("Returns expected NotFoundErrorResponse when service call fails with NotFoundServiceError") { + assertZIO(PartitioningController.getFlowPartitionings(4L, Some(1), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } ) ).provide( PartitioningControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index b7db40174..7019462fb 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -22,6 +22,7 @@ import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -76,6 +77,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningMeasuresById(3L)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getFlowPartitionings(1L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getFlowPartitionings(2L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getFlowPartitionings(3L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getFlowPartitionings(4L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Flow not found"))) + private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -208,11 +218,32 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralServiceError] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getFlowPartitionings(1L, Some(1), Some(1L)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected Right with ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getFlowPartitionings(2L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected GeneralServiceError when database error occurs") { + assertZIO(PartitioningService.getFlowPartitionings(3L, Some(1), Some(1L)).exit)( + failsWithA[GeneralServiceError] + ) + }, + test("Returns expected NotFoundServiceError when flow doesn't exist") { + assertZIO(PartitioningService.getFlowPartitionings(4L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundServiceError] + ) + } ) - ).provide( - PartitioningServiceImpl.layer, - partitioningRepositoryMockLayer ) - - } + }.provide( + PartitioningServiceImpl.layer, + partitioningRepositoryMockLayer + ) } From d2b8e30e51ff42e227980a6c3b898d26d9919617 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 13 Sep 2024 15:11:04 +0200 Subject: [PATCH 13/18] GetFlowPartitioningsV2EndpointUnitTests --- ...FlowPartitioningsV2EndpointUnitTests.scala | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala new file mode 100644 index 000000000..06616eedd --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object GetFlowPartitioningsV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getFlowPartitionings(1L, Some(1), Some(0))) + .thenReturn( + ZIO.succeed( + PaginatedResponse(Seq.empty, Pagination(1, 0, hasMore = true), uuid1) + ) + ) + when(partitioningControllerMock.getFlowPartitionings(2L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("flow not found") + ) + ) + when(partitioningControllerMock.getFlowPartitionings(3L, None, None)) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getFlowPartitioningsServerEndpoint = + getFlowPartitioningsEndpointV2.zServerLogic({ case (flowId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getFlowPartitionings(flowId, limit, offset) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getFlowPartitioningsServerEndpoint) + .thenRunLogic() + .backend() + + suite("GetFlowPartitioningsV2EndpointSuite")( + test("Returns an expected PaginatedResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/1/partitionings?limit=1&offset=0") + .response(asJson[PaginatedResponse[PartitioningWithIdDTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq.empty[PartitioningWithIdDTO], Pagination(1, 0, hasMore = true), uuid1)), + StatusCode.Ok + ) + ) + }, + test("Returns a NotFoundErrorResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/2/partitionings?limit=1&offset=0") + .response(asJson[NotFoundErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns an InternalServerErrorResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/3/partitionings") + .response(asJson[InternalServerErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + + }.provide(partitioningControllerMockLayer) + +} From 01fd90a88a6fe470d3f59afdfd2445eeafb3a149 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 16 Sep 2024 15:01:49 +0200 Subject: [PATCH 14/18] input validation, sql pagination changed --- .../flows/V1.9.8__get_flow_partitionings.sql | 26 +++++++++++++------ .../absa/atum/server/api/http/Endpoints.scala | 6 ++--- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql index 7f27b7885..0259be0c8 100644 --- a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -25,7 +25,6 @@ CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( OUT author TEXT, OUT has_more BOOLEAN ) RETURNS SETOF record AS -$$ ------------------------------------------------------------------------------- -- -- Function: flows.get_flow_partitionings(3) @@ -52,8 +51,10 @@ $$ -- 42 - Partitionings not found -- ------------------------------------------------------------------------------- +$$ +DECLARE + _has_more BOOLEAN; BEGIN - -- Check if the flow exists in runs.flows table PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id; IF NOT FOUND THEN status := 41; @@ -62,16 +63,25 @@ BEGIN RETURN; END IF; + IF i_limit IS NOT NULL THEN + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow = i_flow_id + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + + RETURN QUERY WITH limited_partitionings AS ( - SELECT P.id_partitioning, - P.created_at, - ROW_NUMBER() OVER (ORDER BY P.created_at DESC, P.id_partitioning) AS rn + SELECT P.id_partitioning FROM flows.partitioning_to_flow PF JOIN runs.partitionings P ON PF.fk_partitioning = P.id_partitioning WHERE PF.fk_flow = i_flow_id ORDER BY P.created_at DESC, P.id_partitioning - LIMIT i_limit + 1 OFFSET i_offset + LIMIT i_limit OFFSET i_offset ) SELECT 11 AS status, @@ -79,11 +89,11 @@ BEGIN P.id_partitioning AS id, P.partitioning, P.created_by AS author, - (SELECT COUNT(*) > i_limit FROM limited_partitionings) AS has_more + _has_more AS has_more FROM runs.partitionings P WHERE - P.id_partitioning IN (SELECT LP.id_partitioning FROM limited_partitionings LP WHERE LP.rn <= i_limit) + P.id_partitioning IN (SELECT LP.id_partitioning FROM limited_partitionings LP) ORDER BY P.created_at DESC, P.id_partitioning; diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 34332d247..71359be3c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -24,7 +24,7 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} -import sttp.tapir.{PublicEndpoint, endpoint} +import sttp.tapir.{PublicEndpoint, Validator, endpoint} import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} import java.util.UUID @@ -143,8 +143,8 @@ trait Endpoints extends BaseEndpoints { ], Any] = { apiV2.get .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Partitionings) - .in(query[Option[Int]]("limit").default(Some(10))) - .in(query[Option[Long]]("offset").default(Some(0))) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) .out(statusCode(StatusCode.Ok)) .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) .errorOutVariantPrepend(notFoundErrorOneOfVariant) From 06b18de40d922a5f8d51087222521024d7ad5ba7 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 19 Sep 2024 10:58:02 +0200 Subject: [PATCH 15/18] pr comments addressed --- .../flows/V1.9.8__get_flow_partitionings.sql | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql index 0259be0c8..983de115e 100644 --- a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -47,8 +47,8 @@ CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( -- -- Status codes: -- 11 - OK +-- 12 - OK with no partitionings found -- 41 - Flow not found --- 42 - Partitionings not found -- ------------------------------------------------------------------------------- $$ @@ -75,32 +75,26 @@ BEGIN RETURN QUERY - WITH limited_partitionings AS ( - SELECT P.id_partitioning - FROM flows.partitioning_to_flow PF - JOIN runs.partitionings P ON PF.fk_partitioning = P.id_partitioning - WHERE PF.fk_flow = i_flow_id - ORDER BY P.created_at DESC, P.id_partitioning - LIMIT i_limit OFFSET i_offset - ) SELECT 11 AS status, 'OK' AS status_text, - P.id_partitioning AS id, + P.id_partitioning, P.partitioning, - P.created_by AS author, - _has_more AS has_more + P.created_by, + _has_more FROM - runs.partitionings P + runs.partitionings P INNER JOIN + flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning WHERE - P.id_partitioning IN (SELECT LP.id_partitioning FROM limited_partitionings LP) + PF.fk_flow = i_flow_id ORDER BY - P.created_at DESC, - P.id_partitioning; + P.id_partitioning, + P.created_at DESC + LIMIT i_limit OFFSET i_offset; IF NOT FOUND THEN - status := 42; - status_text := 'Partitionings not found'; + status := 12; + status_text := 'OK with no partitionings found'; RETURN NEXT; END IF; END; From baa526e4c35062e880b72e6d8b94f8fcac03cf5a Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 23 Sep 2024 12:10:24 +0200 Subject: [PATCH 16/18] no status when no results --- .../main/postgres/flows/V1.9.8__get_flow_partitionings.sql | 6 ------ 1 file changed, 6 deletions(-) diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql index 983de115e..ddb0f25aa 100644 --- a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -91,12 +91,6 @@ BEGIN P.id_partitioning, P.created_at DESC LIMIT i_limit OFFSET i_offset; - - IF NOT FOUND THEN - status := 12; - status_text := 'OK with no partitionings found'; - RETURN NEXT; - END IF; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; From 819c05f00f703c6e6446f31f7c45aa2653d7bc5f Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Thu, 26 Sep 2024 16:31:29 +0200 Subject: [PATCH 17/18] Update database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> --- .../src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql index ddb0f25aa..6ef71fd94 100644 --- a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -47,7 +47,6 @@ CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( -- -- Status codes: -- 11 - OK --- 12 - OK with no partitionings found -- 41 - Flow not found -- ------------------------------------------------------------------------------- From 56adaa112273b71d2691889801e6c2a6fa30329c Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 26 Sep 2024 16:39:33 +0200 Subject: [PATCH 18/18] conflicts resolved --- .../repository/PartitioningRepository.scala | 2 +- .../api/service/PartitioningServiceImpl.scala | 19 +------------------ 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 24007e8d1..54ca4f700 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.{CheckpointFromDB, PaginatedResult} +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 992baf4ff..6d678e89d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.atum.server.model.{CheckpointFromDB, PaginatedResult} +import za.co.absa.atum.server.model.PaginatedResult import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -58,23 +58,6 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ServiceError, Seq[CheckpointDTO]] = { - for { - checkpointsFromDB <- repositoryCall( - partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => - ZIO - .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => GeneralServiceError(error.getMessage)) - } - } yield checkpointDTOs - - } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] = { repositoryCall( partitioningRepository.getPartitioningAdditionalDataV2(partitioningId),