From 0037625747ea67625d7882bf5fa90f4242d24b22 Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Wed, 21 Aug 2024 08:19:07 +0200 Subject: [PATCH] Feature/232 post checkpoint (#239) postCheckpointEndpointV2 --- .../runs/V1.5.10__write_checkpoint.sql | 49 +-- .../runs/V1.9.2__write_checkpoint.sql | 109 +++++++ .../WriteCheckpointIntegrationTests.scala | 14 +- ...CheckpointOverloadedIntegrationTests.scala | 289 ++++++++++++++++++ .../absa/atum/model/dto/AtumContextDTO.scala | 1 - .../absa/atum/model/dto/CheckpointV2DTO.scala | 38 +++ .../model/dto/PartitioningSubmitDTO.scala | 2 +- .../za/co/absa/atum/model/dto/package.scala | 1 - .../model/utils/JsonSyntaxExtensions.scala | 18 +- .../za/co/absa/atum/server/Constants.scala | 1 + .../scala/za/co/absa/atum/server/Main.scala | 1 + .../api/controller/BaseController.scala | 15 +- .../api/controller/CheckpointController.scala | 9 +- .../controller/CheckpointControllerImpl.scala | 23 +- .../runs/functions/WriteCheckpointV2.scala | 57 ++++ .../atum/server/api/exception/AppError.scala | 5 +- .../server/api/exception/DatabaseError.scala | 26 ++ .../server/api/exception/ServiceError.scala | 26 ++ .../atum/server/api/http/BaseEndpoints.scala | 22 +- .../absa/atum/server/api/http/Endpoints.scala | 18 +- .../co/absa/atum/server/api/http/Routes.scala | 20 +- .../api/repository/BaseRepository.scala | 18 +- .../api/repository/CheckpointRepository.scala | 4 +- .../repository/CheckpointRepositoryImpl.scala | 20 +- .../PartitioningRepositoryImpl.scala | 4 +- .../atum/server/api/service/BaseService.scala | 13 +- .../api/service/CheckpointService.scala | 4 +- .../api/service/CheckpointServiceImpl.scala | 9 +- .../server/api/service/FlowServiceImpl.scala | 3 +- .../api/service/PartitioningServiceImpl.scala | 7 +- .../atum/server/model/CheckpointFromDB.scala | 9 +- .../atum/server/model/ErrorResponse.scala | 6 +- .../atum/server/model/PaginatedResult.scala | 1 - .../atum/server/model/SuccessResponse.scala | 2 +- server/src/test/resources/reference.conf | 22 +- .../za/co/absa/atum/server/api/TestData.scala | 25 ++ .../CheckpointControllerUnitTests.scala | 54 +++- .../controller/FlowControllerUnitTests.scala | 4 +- .../PartitioningControllerUnitTests.scala | 12 +- .../WriteCheckpointIntegrationTests.scala | 5 +- .../WriteCheckpointV2IntegrationTests.scala | 63 ++++ .../api/http/BaseEndpointsUnitTests.scala | 85 ------ ...> PostCheckpointEndpointV2UnitTests.scala} | 48 ++- .../CheckpointRepositoryUnitTests.scala | 47 ++- .../PartitioningRepositoryUnitTests.scala | 11 +- .../service/CheckpointServiceUnitTests.scala | 41 ++- .../api/service/FlowServiceUnitTests.scala | 5 +- .../PartitioningServiceUnitTests.scala | 26 +- 48 files changed, 1009 insertions(+), 283 deletions(-) create mode 100644 database/src/main/postgres/runs/V1.9.2__write_checkpoint.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala create mode 100644 model/src/main/scala/za/co/absa/atum/model/dto/CheckpointV2DTO.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/exception/DatabaseError.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/exception/ServiceError.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala delete mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala rename server/src/test/scala/za/co/absa/atum/server/api/http/{CreateCheckpointEndpointUnitTests.scala => PostCheckpointEndpointV2UnitTests.scala} (59%) diff --git a/database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql b/database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql index 833769bc1..d5983d6a2 100644 --- a/database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql +++ b/database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql @@ -57,54 +57,35 @@ $$ -- -- Status codes: -- 11 - Checkpoint created --- 14 - Checkpoint already present --- 41 - Partitioning not found +-- 31 - Checkpoint already present +-- 32 - Partitioning not found -- ------------------------------------------------------------------------------- DECLARE _fk_partitioning BIGINT; BEGIN - PERFORM 1 - FROM runs.checkpoints CP - WHERE CP.id_checkpoint = i_id_checkpoint; - - IF found THEN - status := 14; - status_text := 'Checkpoint already present'; - RETURN; - END IF; - _fk_partitioning = runs._get_id_partitioning(i_partitioning); IF _fk_partitioning IS NULL THEN - status := 41; + status := 32; status_text := 'Partitioning not found'; RETURN; END IF; - INSERT INTO runs.checkpoints (id_checkpoint, fk_partitioning, - checkpoint_name, measured_by_atum_agent, - process_start_time, process_end_time, created_by) - VALUES (i_id_checkpoint, _fk_partitioning, - i_checkpoint_name, i_measured_by_atum_agent, - i_process_start_time, i_process_end_time, i_by_user); - - -- maybe could use `jsonb_populate_record` function to be little bit more effective - PERFORM runs._write_measurement( - i_id_checkpoint, - _fk_partitioning, - UN.measurement->'measure'->>'measureName', - jsonb_array_to_text_array(UN.measurement->'measure'->'measuredColumns'), - UN.measurement->'result', - i_by_user - ) - FROM ( - SELECT unnest(i_measurements) AS measurement - ) UN; + SELECT WC.status, WC.status_text + FROM runs.write_checkpoint( + _fk_partitioning, + i_id_checkpoint, + i_checkpoint_name, + i_process_start_time, + i_process_end_time, + i_measurements, + i_measured_by_atum_agent, + i_by_user + ) WC + INTO status, status_text; - status := 11; - status_text := 'Checkpoint created'; RETURN; END; $$ diff --git a/database/src/main/postgres/runs/V1.9.2__write_checkpoint.sql b/database/src/main/postgres/runs/V1.9.2__write_checkpoint.sql new file mode 100644 index 000000000..61bbcd31f --- /dev/null +++ b/database/src/main/postgres/runs/V1.9.2__write_checkpoint.sql @@ -0,0 +1,109 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.write_checkpoint( + IN i_partitioning_id BIGINT, + IN i_id_checkpoint UUID, + IN i_checkpoint_name TEXT, + IN i_process_start_time TIMESTAMP WITH TIME ZONE, + IN i_process_end_time TIMESTAMP WITH TIME ZONE, + IN i_measurements JSONB[], + IN i_measured_by_atum_agent BOOLEAN, + IN i_by_user TEXT, + OUT status INTEGER, + OUT status_text TEXT +) RETURNS record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.write_checkpoint(8) +-- Creates a checkpoint and adds all the measurements that it consists of +-- +-- Parameters: +-- i_partitioning_id - ID of the partitioning the measure belongs to +-- i_id_checkpoint - reference to the checkpoint this measure belongs into +-- i_checkpoint_name - name of the checkpoint +-- i_process_start_time - the start of processing (measuring) of the checkpoint +-- i_process_end_time - the end of the processing (measuring) of the checkpoint +-- i_measurements - array of JSON objects of the following format (values of the keys are examples only) +-- { +-- "measure": { +-- "measureName": "count", +-- "measuredColumns": ["a","b"] +-- }, +-- "result": { +-- whatever here +-- } +-- } +-- i_measured_by_atum_agent - flag it the checkpoint was measured by Atum or data provided by user +-- i_by_user - user behind the change +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- +-- Status codes: +-- 11 - Checkpoint created +-- 31 - Conflict, checkpoint already present +-- 32 - Partitioning not found +-- +------------------------------------------------------------------------------- +BEGIN + -- Check if partitioning exists + IF NOT EXISTS (SELECT 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id) THEN + status := 32; + status_text := 'Partitioning not found'; + RETURN; + END IF; + + PERFORM 1 + FROM runs.checkpoints CP + WHERE CP.id_checkpoint = i_id_checkpoint; + + IF found THEN + status := 31; + status_text := 'Checkpoint already present'; + RETURN; + END IF; + + INSERT INTO runs.checkpoints (id_checkpoint, fk_partitioning, + checkpoint_name, measured_by_atum_agent, + process_start_time, process_end_time, created_by) + VALUES (i_id_checkpoint, i_partitioning_id, + i_checkpoint_name, i_measured_by_atum_agent, + i_process_start_time, i_process_end_time, i_by_user); + + -- maybe could use `jsonb_populate_record` function to be little bit more effective + PERFORM runs._write_measurement( + i_id_checkpoint, + i_partitioning_id, + UN.measurement->'measure'->>'measureName', + jsonb_array_to_text_array(UN.measurement->'measure'->'measuredColumns'), + UN.measurement->'result', + i_by_user + ) + FROM ( + SELECT unnest(i_measurements) AS measurement + ) UN; + + status := 11; + status_text := 'Checkpoint created'; + RETURN; +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.write_checkpoint(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.write_checkpoint(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala index 3c087558f..51631bfaa 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala @@ -25,6 +25,8 @@ import java.util.UUID class WriteCheckpointIntegrationTests extends DBTestSuite { + private val fnWriteCheckpoint = "runs.write_checkpoint" + private val partitioning = JsonBString( """ |{ @@ -62,7 +64,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) - function("runs.write_checkpoint") + function(fnWriteCheckpoint) .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Empty path") @@ -150,7 +152,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) - function("runs.write_checkpoint") + function(fnWriteCheckpoint) .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Happy path") @@ -237,7 +239,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { .add("created_by", origAuthor) ) - function("runs.write_checkpoint") + function(fnWriteCheckpoint) .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Won't go in") @@ -249,7 +251,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { .execute { queryResult => assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(14)) + assert(row.getInt("status").contains(31)) assert(row.getString("status_text").contains("Checkpoint already present")) } @@ -264,7 +266,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { test("Partitioning of the checkpoint does not exist") { val uuid = UUID.randomUUID val count = table("runs.checkpoints").count() - function("runs.write_checkpoint") + function(fnWriteCheckpoint) .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Won't go in") @@ -276,7 +278,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { .execute { queryResult => assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(41)) + assert(row.getInt("status").contains(32)) assert(row.getString("status_text").contains("Partitioning not found")) } assert(table("runs.checkpoints").count() == count) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala new file mode 100644 index 000000000..15f339ce3 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala @@ -0,0 +1,289 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString +import za.co.absa.balta.classes.setter.CustomDBType + +import java.time.OffsetDateTime +import java.util.UUID + +class WriteCheckpointOverloadedIntegrationTests extends DBTestSuite { + + private val fnWriteCheckpoint = "runs.write_checkpoint" + + private val partitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValues": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + + private val measurements = + """ + |{ + | "{ + | \"measure\": { + | \"measureName\": \"count\", + | \"measuredColumns\": [] + | }, + | \"result\":{ + | \"value\":\"3\", + | \"type\":\"int\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"col1\"] + | }, + | \"result\":{ + | \"value\":\"3.14\", + | \"type\":\"double\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"a\",\"b\"] + | }, + | \"result\":{ + | \"value\":\"2.71\", + | \"type\":\"double\" + | } + | }" + |} + |""".stripMargin + + test("Write new checkpoint without data") { + val uuid = UUID.randomUUID + val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + + + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", "John von Neumann") + ) + //DBTable's insert doesn't return the values yet correctly + val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.measure_definitions").insert( + add("fk_partitioning", fkPartitioning) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Aristoteles") + ) + + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) + + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Empty path") + .setParam("i_process_start_time", startTime) + .setParam("i_process_end_time", endTime) + .setParam("i_measurements", CustomDBType("{}", "JSONB[]")) + .setParam("i_measured_by_atum_agent", true) + .setParam("i_by_user", "J. Robert Oppenheimer") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Checkpoint created")) + } + + assert(table("runs.measure_definitions").count(add("fk_partitioning", fkPartitioning)) == 1) + assert(table("runs.measurements").count(add("fk_checkpoint", uuid)) == 0) + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 1) + table("runs.checkpoints").where(add("fk_partitioning", fkPartitioning)) {resultSet => + val row = resultSet.next() + assert(row.getString("checkpoint_name").contains("Empty path")) + assert(row.getOffsetDateTime("process_start_time").contains(startTime)) + assert(row.getOffsetDateTime("process_end_time").contains(endTime)) + assert(row.getBoolean("measured_by_atum_agent").contains(true)) + assert(row.getString("created_by").contains("J. Robert Oppenheimer")) + assert(row.getOffsetDateTime("created_at").contains(now())) + } + + } + + test("Write new checkpoint"){ + val uuid = UUID.randomUUID + val user = "Franz Kafka" + val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + + + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", user) + ) + //DBTable's insert doesn't return the values yet correctly + val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.measure_definitions").insert( + add("fk_partitioning", fkPartitioning) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Aristoteles") + ) + + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) + + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Happy path") + .setParam("i_process_start_time", startTime) + .setParam("i_process_end_time", endTime) + .setParam("i_measurements", CustomDBType(measurements, "JSONB[]")) + .setParam("i_measured_by_atum_agent", false) + .setParam("i_by_user", user) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Checkpoint created")) + } + + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 1) + assert(table("runs.measure_definitions").count(add("fk_partitioning", fkPartitioning)) == 3) + assert(table("runs.measurements").count(add("fk_checkpoint", uuid)) == 3) + table("runs.checkpoints").where(add("fk_partitioning", fkPartitioning)) { resultSet => + val row = resultSet.next() + assert(row.getString("checkpoint_name").contains("Happy path")) + assert(row.getOffsetDateTime("process_start_time").contains(startTime)) + assert(row.getOffsetDateTime("process_end_time").contains(endTime)) + assert(row.getBoolean("measured_by_atum_agent").contains(false)) + assert(row.getString("created_by").contains(user)) + assert(row.getOffsetDateTime("created_at").contains(now())) + } + + val measureDefinitionIds = table("runs.measure_definitions") + .where(add("fk_partitioning", fkPartitioning),"id_measure_definition") { resultSet => + val row1 = resultSet.next() + val result1: Vector[Long] = Vector(row1.getLong("id_measure_definition").get) + assert(row1.getString("measure_name").contains("avg")) + assert(row1.getArray[String]("measured_columns").map(_.toList).contains(List("col1"))) + assert(row1.getString("created_by").contains("Aristoteles")) + assert(row1.getOffsetDateTime("created_at").contains(now())) + val row2 = resultSet.next() + val result2: Vector[Long] = result1 :+ row2.getLong("id_measure_definition").get + assert(row2.getString("measure_name").contains("count")) + assert(row2.getArray[String]("measured_columns").map(_.toList).contains(List.empty)) + assert(row2.getString("created_by").contains(user)) + assert(row2.getOffsetDateTime("created_at").contains(now())) + val row3 = resultSet.next() + val result3: Vector[Long] = result2 :+ row3.getLong("id_measure_definition").get + assert(row3.getString("measure_name").contains("avg")) + assert(row3.getArray[String]("measured_columns").map(_.toList).contains(List("a", "b"))) + assert(row3.getString("created_by").contains(user)) + assert(row3.getOffsetDateTime("created_at").contains(now())) + result3 + } + table("runs.measurements").where(add("fk_checkpoint", uuid), "id_measurement") { resultSet => + val row1 = resultSet.next() + // because measure definition of `count` was created only after the manual enter of the `avg`, it's actually only + // second in the list + assert(row1.getLong("fk_measure_definition").contains(measureDefinitionIds(1))) + assert(row1.getJsonB("measurement_value").contains(JsonBString("""{"type": "int", "value": "3"}"""))) + val row2 = resultSet.next() + assert(row2.getLong("fk_measure_definition").contains(measureDefinitionIds(0))) + assert(row2.getJsonB("measurement_value").contains(JsonBString("""{"type": "double", "value": "3.14"}"""))) + val row3 = resultSet.next() + assert(row3.getLong("fk_measure_definition").contains(measureDefinitionIds(2))) + assert(row3.getJsonB("measurement_value").contains(JsonBString("""{"type": "double", "value": "2.71"}"""))) + } + } + + test("Checkpoint already exists") { + val uuid = UUID.randomUUID + val origAuthor = "John von Neumann" + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", origAuthor) + ) + + //DBTable's insert doesn't return the values yet correctly + val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.checkpoints").insert( + add("id_checkpoint", uuid) + .add("fk_partitioning", fkPartitioning) + .add("checkpoint_name", "I came before") + .add("process_start_time", now()) + .add("process_end_time", now()) + .add("measured_by_atum_agent", false) + .add("created_by", origAuthor) + ) + + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Won't go in") + .setParam("i_process_start_time", now()) + .setParamNull("i_process_end_time") + .setParamNull("i_measurements") + .setParam("i_measured_by_atum_agent", true) + .setParam("i_by_user", "J. Robert Oppenheimer") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(31)) + assert(row.getString("status_text").contains("Checkpoint already present")) + } + + table("runs.checkpoints").where(add("id_checkpoint", uuid)){queryResult => + val row = queryResult.next() + assert(row.getString("checkpoint_name").contains("I came before")) + assert(row.getBoolean("measured_by_atum_agent").contains(false)) + assert(row.getString("created_by").contains(origAuthor)) + } + } + + test("Partitioning of the checkpoint does not exist") { + val uuid = UUID.randomUUID + val count = table("runs.checkpoints").count() + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", 0L) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Won't go in") + .setParam("i_process_start_time", now()) + .setParamNull("i_process_end_time") + .setParamNull("i_measurements") + .setParam("i_measured_by_atum_agent", true) + .setParam("i_by_user", "J. Robert Oppenheimer") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(32)) + assert(row.getString("status_text").contains("Partitioning not found")) + } + assert(table("runs.checkpoints").count() == count) + } + +} diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/AtumContextDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/AtumContextDTO.scala index 7510dd4ca..8bb36a312 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/AtumContextDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/AtumContextDTO.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.model.dto - import io.circe.generic.semiauto._ import io.circe._ diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointV2DTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointV2DTO.scala new file mode 100644 index 000000000..ad80b373e --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointV2DTO.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.dto + +import io.circe.{Decoder, Encoder} +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} + +import java.time.ZonedDateTime +import java.util.UUID + +case class CheckpointV2DTO( + id: UUID, + name: String, + author: String, + measuredByAtumAgent: Boolean = false, + processStartTime: ZonedDateTime, + processEndTime: Option[ZonedDateTime], + measurements: Set[MeasurementDTO] +) + +object CheckpointV2DTO { + implicit val decodeCheckpointDTO: Decoder[CheckpointV2DTO] = deriveDecoder + implicit val encodeCheckpointDTO: Encoder[CheckpointV2DTO] = deriveEncoder +} diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/PartitioningSubmitDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/PartitioningSubmitDTO.scala index ad4fce6e0..e3c5c8aad 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/PartitioningSubmitDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/PartitioningSubmitDTO.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.model.dto import io.circe.generic.semiauto._ import io.circe._ -case class PartitioningSubmitDTO ( +case class PartitioningSubmitDTO( partitioning: PartitioningDTO, parentPartitioning: Option[PartitioningDTO], authorIfNew: String diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/package.scala b/model/src/main/scala/za/co/absa/atum/model/dto/package.scala index a528e7348..e9e92829c 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/package.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/package.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.model - import io.circe._ package object dto { diff --git a/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala b/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala index 4a95546a9..d69d9dc36 100644 --- a/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala +++ b/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala @@ -22,17 +22,17 @@ import io.circe.{Decoder, Encoder} object JsonSyntaxExtensions { - implicit class JsonSerializationSyntax[T: Encoder](obj: T) { - def asJsonString: String = obj.asJson.noSpaces - } + implicit class JsonSerializationSyntax[T: Encoder](obj: T) { + def asJsonString: String = obj.asJson.noSpaces + } - implicit class JsonDeserializationSyntax(jsonStr: String) { - def as[T: Decoder]: T = { - decode[T](jsonStr) match { - case Right(value) => value - case Left(error) => throw new RuntimeException(s"Failed to decode JSON: $error") - } + implicit class JsonDeserializationSyntax(jsonStr: String) { + def as[T: Decoder]: T = { + decode[T](jsonStr) match { + case Right(value) => value + case Left(error) => throw new RuntimeException(s"Failed to decode JSON: $error") } } + } } diff --git a/server/src/main/scala/za/co/absa/atum/server/Constants.scala b/server/src/main/scala/za/co/absa/atum/server/Constants.scala index 6b200ae05..a2e6aa42c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Constants.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Constants.scala @@ -16,6 +16,7 @@ package za.co.absa.atum.server +// TODO: to be removed when v2 endpoints are implemented, replaced by ApiPaths object object Constants { object Endpoints { diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 19ab07fb8..5a07279b6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -56,6 +56,7 @@ object Main extends ZIOAppDefault with Server { CreateOrUpdateAdditionalData.layer, GetPartitioningCheckpoints.layer, WriteCheckpoint.layer, + WriteCheckpointV2.layer, GetFlowCheckpoints.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 88c133487..1d2316911 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -17,7 +17,9 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} +import za.co.absa.atum.server.api.exception.ServiceError._ +import za.co.absa.atum.server.api.http.ApiPaths +import za.co.absa.atum.server.model.{ConflictErrorResponse, ErrorResponse, InternalServerErrorResponse} import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import zio._ @@ -29,8 +31,9 @@ trait BaseController { ): IO[ErrorResponse, B] = { serviceCall - .mapError { serviceError: ServiceError => - InternalServerErrorResponse(serviceError.message) + .mapError { + case ConflictServiceError(message) => ConflictErrorResponse(message) + case GeneralServiceError(message) => InternalServerErrorResponse(message) } .flatMap { result => ZIO.succeed(onSuccessFnc(result)) @@ -49,4 +52,10 @@ trait BaseController { ): IO[ErrorResponse, MultiSuccessResponse[A]] = { effect.map(MultiSuccessResponse(_)) } + + // Root-anchored URL path + // https://stackoverflow.com/questions/2005079/absolute-vs-relative-urls/78439286#78439286 + protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = { + ZIO.succeed(s"/${ApiPaths.Api}/${ApiPaths.V2}/${parts.mkString("/")}") + } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index 6b547c3cb..d22e2e0b9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import zio.IO @@ -27,8 +27,9 @@ trait CheckpointController { def createCheckpointV1(checkpointDTO: CheckpointDTO): IO[ErrorResponse, CheckpointDTO] - def createCheckpointV2( - checkpointDTO: CheckpointDTO - ): IO[ErrorResponse, SingleSuccessResponse[CheckpointDTO]] + def postCheckpointV2( + partitioningId: Long, + checkpointV2DTO: CheckpointV2DTO + ): IO[ErrorResponse, (SingleSuccessResponse[CheckpointV2DTO], String)] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index b64c825a7..5b6e3ceb8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -16,7 +16,8 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} +import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse @@ -33,12 +34,22 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che ) } - override def createCheckpointV2( - checkpointDTO: CheckpointDTO - ): IO[ErrorResponse, SingleSuccessResponse[CheckpointDTO]] = { - mapToSingleSuccessResponse(createCheckpointV1(checkpointDTO)) + override def postCheckpointV2( + partitioningId: Long, + checkpointV2DTO: CheckpointV2DTO + ): IO[ErrorResponse, (SingleSuccessResponse[CheckpointV2DTO], String)] = { + for { + response <- mapToSingleSuccessResponse( + serviceCall[Unit, CheckpointV2DTO]( + checkpointService.saveCheckpointV2(partitioningId, checkpointV2DTO), + _ => checkpointV2DTO + ) + ) + uri <- createV2RootAnchoredResourcePath( + Seq(V2Paths.Partitionings, partitioningId.toString, V2Paths.Checkpoints, checkpointV2DTO.id.toString) + ) + } yield (response, uri) } - } object CheckpointControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala new file mode 100644 index 000000000..ff797dd70 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import zio._ +import io.circe.syntax._ +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut +import doobie.postgres.implicits._ +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs + +class WriteCheckpointV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieSingleResultFunctionWithStatus[WriteCheckpointArgs, Unit, Task](args => + Seq( + fr"${args.partitioningId}", + fr"${args.checkpointV2DTO.id}", + fr"${args.checkpointV2DTO.name}", + fr"${args.checkpointV2DTO.processStartTime}", + fr"${args.checkpointV2DTO.processEndTime}", + fr"${args.checkpointV2DTO.measurements.toList.map(_.asJson)}", + fr"${args.checkpointV2DTO.measuredByAtumAgent}", + fr"${args.checkpointV2DTO.author}" + ), + Some("write_checkpoint") + ) + with StandardStatusHandling + +object WriteCheckpointV2 { + case class WriteCheckpointArgs(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO) + + val layer: URLayer[PostgresDatabaseProvider, WriteCheckpointV2] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new WriteCheckpointV2()(Runs, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala b/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala index c4b129b94..ff4411b7b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala @@ -16,9 +16,6 @@ package za.co.absa.atum.server.api.exception -sealed trait AppError extends Throwable { +abstract class AppError extends Exception { def message: String } - -case class DatabaseError(message: String) extends AppError -case class ServiceError(message: String) extends AppError diff --git a/server/src/main/scala/za/co/absa/atum/server/api/exception/DatabaseError.scala b/server/src/main/scala/za/co/absa/atum/server/api/exception/DatabaseError.scala new file mode 100644 index 000000000..7ef1cee7e --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/exception/DatabaseError.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.exception + +sealed trait DatabaseError extends AppError + +object DatabaseError { + + case class GeneralDatabaseError(message: String) extends DatabaseError + case class ConflictDatabaseError(message: String) extends DatabaseError + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/exception/ServiceError.scala b/server/src/main/scala/za/co/absa/atum/server/api/exception/ServiceError.scala new file mode 100644 index 000000000..3d36a28d2 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/exception/ServiceError.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.exception + +sealed trait ServiceError extends AppError + +object ServiceError { + + case class GeneralServiceError(message: String) extends ServiceError + case class ConflictServiceError(message: String) extends ServiceError + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index 3ef8edfb1..aac2a5e7b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -19,12 +19,7 @@ package za.co.absa.atum.server.api.http import sttp.model.StatusCode import sttp.tapir.generic.auto.schemaForCaseClass import sttp.tapir.json.circe.jsonBody -import za.co.absa.atum.server.model.{ - BadRequestResponse, - ErrorResponse, - GeneralErrorResponse, - InternalServerErrorResponse -} +import za.co.absa.atum.server.model._ import sttp.tapir.typelevel.MatchType import sttp.tapir.ztapir._ import sttp.tapir.{EndpointOutput, PublicEndpoint} @@ -36,6 +31,13 @@ trait BaseEndpoints { implicit val uuidMatchType: MatchType[UUID] = (a: Any) => a.isInstanceOf[UUID] + protected val conflictErrorOneOfVariant: EndpointOutput.OneOfVariant[ConflictErrorResponse] = { + oneOfVariantFromMatchType( + StatusCode.Conflict, + jsonBody[ConflictErrorResponse] + ) + } + private val badRequestOneOfVariant: EndpointOutput.OneOfVariant[BadRequestResponse] = { oneOfVariantFromMatchType( StatusCode.BadRequest, @@ -75,12 +77,4 @@ trait BaseEndpoints { baseEndpoint.in(Api / V2) } - def pathToAPIv1CompatibleFormat(apiURLPath: String): String = { - // this is basically kebab-case/snake_case to camelCase - val inputParts = apiURLPath.split("[_-]") - - // Capitalize the first letter of each part except the first one (lowercase always) - inputParts.head.toLowerCase + inputParts.tail.map(_.capitalize).mkString("") - } - } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 792d68a25..174babd8f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -25,30 +25,34 @@ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import sttp.tapir.{PublicEndpoint, endpoint} +import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} trait Endpoints extends BaseEndpoints { protected val createCheckpointEndpointV1: PublicEndpoint[CheckpointDTO, ErrorResponse, CheckpointDTO, Any] = { apiV1.post - .in(pathToAPIv1CompatibleFormat(CreateCheckpoint)) + .in(V1Paths.CreateCheckpoint) .in(jsonBody[CheckpointDTO]) .out(statusCode(StatusCode.Created)) .out(jsonBody[CheckpointDTO]) + .errorOutVariantPrepend(conflictErrorOneOfVariant) } - protected val createCheckpointEndpointV2 - : PublicEndpoint[CheckpointDTO, ErrorResponse, SingleSuccessResponse[CheckpointDTO], Any] = { + protected val postCheckpointEndpointV2 + : PublicEndpoint[(Long, CheckpointV2DTO), ErrorResponse, (SingleSuccessResponse[CheckpointV2DTO], String), Any] = { apiV2.post - .in(CreateCheckpoint) - .in(jsonBody[CheckpointDTO]) + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Checkpoints) + .in(jsonBody[CheckpointV2DTO]) .out(statusCode(StatusCode.Created)) - .out(jsonBody[SingleSuccessResponse[CheckpointDTO]]) + .out(jsonBody[SingleSuccessResponse[CheckpointV2DTO]]) + .out(header[String]("Location")) + .errorOutVariantPrepend(conflictErrorOneOfVariant) } protected val createPartitioningEndpointV1 : PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, AtumContextDTO, Any] = { apiV1.post - .in(pathToAPIv1CompatibleFormat(CreatePartitioning)) + .in(V1Paths.CreatePartitioning) .in(jsonBody[PartitioningSubmitDTO]) .out(statusCode(StatusCode.Ok)) .out(jsonBody[AtumContextDTO]) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 1da88efce..d026abd30 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -19,14 +19,19 @@ package za.co.absa.atum.server.api.http import cats.syntax.semigroupk._ import org.http4s.HttpRoutes import sttp.tapir.PublicEndpoint +import sttp.tapir.model.ServerRequest import sttp.tapir.server.http4s.Http4sServerInterpreter import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} -import za.co.absa.atum.server.api.controller.{CheckpointController, PartitioningController, FlowController} +import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} +import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} +import za.co.absa.atum.server.model.ErrorResponse +import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -39,7 +44,16 @@ trait Routes extends Endpoints with ServerOptions { } val endpoints = List( createServerEndpoint(createCheckpointEndpointV1, CheckpointController.createCheckpointV1), - createServerEndpoint(createCheckpointEndpointV2, CheckpointController.createCheckpointV2), + createServerEndpoint[ + (Long, CheckpointV2DTO), + ErrorResponse, + (SingleSuccessResponse[CheckpointV2DTO], String) + ]( + postCheckpointEndpointV2, + { case (partitioningId: Long, checkpointV2DTO: CheckpointV2DTO) => + CheckpointController.postCheckpointV2(partitioningId, checkpointV2DTO) + } + ), createServerEndpoint(createPartitioningEndpointV1, PartitioningController.createPartitioningIfNotExistsV1), createServerEndpoint(createPartitioningEndpointV2, PartitioningController.createPartitioningIfNotExistsV2), createServerEndpoint( @@ -59,7 +73,7 @@ trait Routes extends Endpoints with ServerOptions { private def createSwaggerRoutes: HttpRoutes[HttpEnv.F] = { val endpoints = List( createCheckpointEndpointV1, - createCheckpointEndpointV2, + postCheckpointEndpointV2, createPartitioningEndpointV1, createPartitioningEndpointV2, createOrUpdateAdditionalDataEndpointV2, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala index 9f9764f91..0bb9500ab 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala @@ -17,7 +17,8 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.db.fadb.exceptions.StatusException +import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.db.fadb.exceptions.{DataConflictException, StatusException} import za.co.absa.db.fadb.status.{FailedOrRow, FailedOrRows} import zio._ @@ -32,7 +33,7 @@ trait BaseRepository { case Left(statusException: StatusException) => ZIO.logError( s"Exception caused by operation: '$operationName': " + - s"(${statusException.status.statusCode}), ${statusException.status.statusText}" + s"(${statusException.status.statusCode}) ${statusException.status.statusText}" ) case Right(_) => ZIO.logDebug(s"Operation '$operationName' succeeded in database") } @@ -40,12 +41,15 @@ trait BaseRepository { private def defaultErrorHandler(operationName: String): PartialFunction[Throwable, DatabaseError] = { case statusException: StatusException => - DatabaseError( - s"Exception caused by operation: '$operationName': " + - s"(${statusException.status.statusCode}) ${statusException.status.statusText}" - ) + val message = s"Exception caused by operation: '$operationName': " + + s"(${statusException.status.statusCode}) ${statusException.status.statusText}" + + statusException match { + case DataConflictException(_) => ConflictDatabaseError(message) + case _ => GeneralDatabaseError(message) + } case error => - DatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}") + GeneralDatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}") } def dbSingleResultCallWithStatus[R](dbFuncCall: Task[FailedOrRow[R]], operationName: String): IO[DatabaseError, R] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index 59c33d1b6..a0e51264d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.DatabaseError import zio._ import zio.macros.accessible @@ -24,5 +24,5 @@ import zio.macros.accessible @accessible trait CheckpointRepository { def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] - + def writeCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[DatabaseError, Unit] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index e63d92c9f..0fd132a4d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -16,24 +16,34 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointDTO -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} import za.co.absa.atum.server.api.exception.DatabaseError import zio._ import zio.interop.catz.asyncInstance -class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint) extends CheckpointRepository with BaseRepository { +class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint, writeCheckpointV2Fn: WriteCheckpointV2) + extends CheckpointRepository + with BaseRepository { override def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] = { dbSingleResultCallWithStatus(writeCheckpointFn(checkpointDTO), "writeCheckpoint") } + override def writeCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[DatabaseError, Unit] = { + dbSingleResultCallWithStatus( + writeCheckpointV2Fn(WriteCheckpointArgs(partitioningId, checkpointV2DTO)), + "writeCheckpoint" + ) + } } object CheckpointRepositoryImpl { - val layer: URLayer[WriteCheckpoint, CheckpointRepository] = ZLayer { + val layer: URLayer[WriteCheckpoint with WriteCheckpointV2, CheckpointRepository] = ZLayer { for { writeCheckpoint <- ZIO.service[WriteCheckpoint] - } yield new CheckpointRepositoryImpl(writeCheckpoint) + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + } yield new CheckpointRepositoryImpl(writeCheckpoint, writeCheckpointV2) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 523a98f0f..68ff69276 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -65,7 +65,9 @@ class PartitioningRepositoryImpl( }) } - override def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[DatabaseError, InitialAdditionalDataDTO] = { + override def getPartitioningAdditionalData( + partitioning: PartitioningDTO + ): IO[DatabaseError, InitialAdditionalDataDTO] = { dbMultipleResultCallWithAggregatedStatus( getPartitioningAdditionalDataFn(partitioning), "getPartitioningAdditionalData" diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala index 680dcc22d..33ca81d02 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala @@ -16,16 +16,23 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} +import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.api.exception.ServiceError._ +import za.co.absa.atum.server.api.exception._ import zio._ trait BaseService { def repositoryCall[R](repositoryCall: IO[DatabaseError, R], operationName: String): IO[ServiceError, R] = { repositoryCall - .mapError { case DatabaseError(message) => - ServiceError(s"Failed to perform '$operationName': $message") + .mapError { + case ConflictDatabaseError(message) => ConflictServiceError(createMessage(operationName, message)) + case GeneralDatabaseError(message) => GeneralServiceError(createMessage(operationName, message)) } } + private def createMessage(operationName: String, message: String): String = { + s"Failed to perform '$operationName': $message" + } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index a38811890..0d41d9dd3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError import zio.IO import zio.macros.accessible @@ -24,5 +24,5 @@ import zio.macros.accessible @accessible trait CheckpointService { def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] - + def saveCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[ServiceError, Unit] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index aae123ea4..600b037ea 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.CheckpointRepository import zio._ @@ -30,6 +30,13 @@ class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends ) } + override def saveCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[ServiceError, Unit] = { + repositoryCall( + checkpointRepository.writeCheckpointV2(partitioningId, checkpointV2DTO), + "saveCheckpoint" + ) + } + } object CheckpointServiceImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 5f788b86c..497d1c4d8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.FlowRepository import za.co.absa.atum.server.model.CheckpointFromDB import zio._ @@ -33,7 +34,7 @@ class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with B checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => ZIO .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => ServiceError(error.getMessage)) + .mapError(error => GeneralServiceError(error.getMessage)) } } yield checkpointDTOs } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index a4978718d..eebb7a2ba 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository import za.co.absa.atum.server.model.CheckpointFromDB import zio._ @@ -47,7 +48,9 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } - override def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[ServiceError, InitialAdditionalDataDTO] = { + override def getPartitioningAdditionalData( + partitioning: PartitioningDTO + ): IO[ServiceError, InitialAdditionalDataDTO] = { repositoryCall( partitioningRepository.getPartitioningAdditionalData(partitioning), "getPartitioningAdditionalData" @@ -65,7 +68,7 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => ZIO .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => ServiceError(error.getMessage)) + .mapError(error => GeneralServiceError(error.getMessage)) } } yield checkpointDTOs diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala index 0eba1e01c..c2e4177b3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala @@ -16,7 +16,14 @@ package za.co.absa.atum.server.model -import za.co.absa.atum.model.dto.{CheckpointDTO, MeasureDTO, MeasureResultDTO, MeasurementDTO, PartitioningDTO} +import za.co.absa.atum.model.dto.{ + CheckpointDTO, + CheckpointV2DTO, + MeasureDTO, + MeasureResultDTO, + MeasurementDTO, + PartitioningDTO +} import io.circe.{DecodingFailure, Json} import java.time.ZonedDateTime diff --git a/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala b/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala index ac3be1e73..78ff0b183 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala @@ -37,16 +37,14 @@ object BadRequestResponse { implicit val encodeBadRequestResponse: Encoder[BadRequestResponse] = deriveEncoder } -final case class ConflictErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) - extends ErrorResponse +final case class ConflictErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse object ConflictErrorResponse { implicit val decoderConflictErrorResponse: Decoder[ConflictErrorResponse] = deriveDecoder implicit val encoderConflictErrorResponse: Encoder[ConflictErrorResponse] = deriveEncoder } -final case class NotFoundErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) - extends ErrorResponse +final case class NotFoundErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse object NotFoundErrorResponse { implicit val decoderNotFoundErrorResponse: Decoder[NotFoundErrorResponse] = deriveDecoder diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala index f5f0b9154..e3a2898e4 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala @@ -26,4 +26,3 @@ object PaginatedResult { case class ResultNoMore[R](data: Seq[R]) extends PaginatedResult[R] } - diff --git a/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala b/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala index 3c3341300..7fb5c6da3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala @@ -40,7 +40,7 @@ object SuccessResponse { } case class PaginatedResponse[T](data: Seq[T], pagination: Pagination, requestId: UUID = UUID.randomUUID()) - extends SuccessResponse + extends SuccessResponse object PaginatedResponse { implicit def encoder[T: Encoder]: Encoder[PaginatedResponse[T]] = deriveEncoder diff --git a/server/src/test/resources/reference.conf b/server/src/test/resources/reference.conf index c4fc78a1d..05d4df973 100644 --- a/server/src/test/resources/reference.conf +++ b/server/src/test/resources/reference.conf @@ -1,5 +1,5 @@ { - postgres { + postgres { # The JDBC driver class dataSourceClass=org.postgresql.Driver serverName=localhost @@ -11,4 +11,24 @@ # maximum number of connections that HikariCP will keep in the pool, including both idle and in-use connections maxPoolSize=10 } + aws { + region = "af-south-1" + dbPasswordSecretName = "serviceUserSecretKey" + } + ssl { + enabled=false + keyStorePassword=password + keyStorePath="/path/to/your/cert" + } + monitoring { + # monitoring of http communication + http { + enabled=true + } + # monitoring of jvm and zio + jvm { + enabled=true + intervalInSeconds=5 + } + } } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 5e58ee4ac..5ce0fe048 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -191,6 +191,31 @@ trait TestData { ) protected val checkpointDTO3: CheckpointDTO = checkpointDTO1.copy(id = UUID.randomUUID()) + protected val checkpointDTO4: CheckpointDTO = checkpointDTO1.copy(id = UUID.randomUUID()) + + // Checkpoint V2 DTO + protected val checkpointV2DTO1: CheckpointV2DTO = CheckpointV2DTO( + id = UUID.randomUUID(), + name = checkpointQueryDTO1.checkpointName.get, + author = "author", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.now(), + processEndTime = Some(ZonedDateTime.now()), + measurements = measurementsDTO1.toSet + ) + + protected val checkpointV2DTO2: CheckpointV2DTO = CheckpointV2DTO( + id = UUID.randomUUID(), + name = checkpointQueryDTO2.checkpointName.get, + author = "author2", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.now(), + processEndTime = Some(ZonedDateTime.now()), + measurements = measurementsDTO2.toSet + ) + + protected val checkpointV2DTO3: CheckpointV2DTO = checkpointV2DTO1.copy(id = UUID.randomUUID()) + protected val checkpointV2DTO4: CheckpointV2DTO = checkpointV2DTO1.copy(id = UUID.randomUUID()) // Checkpoint From DB protected val checkpointFromDB1: CheckpointFromDB = CheckpointFromDB( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala index 7554742eb..6b4ed6b2e 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala @@ -17,23 +17,34 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.CheckpointService -import za.co.absa.atum.server.model.InternalServerErrorResponse -import zio.test.Assertion.failsWithA +import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.{ConflictErrorResponse, InternalServerErrorResponse} import zio._ +import zio.test.Assertion.failsWithA import zio.test._ -object CheckpointControllerUnitTests extends ZIOSpecDefault with TestData { +object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { private val checkpointServiceMock = mock(classOf[CheckpointService]) - when(checkpointServiceMock.saveCheckpoint(checkpointDTO1)).thenReturn(ZIO.succeed(())) + when(checkpointServiceMock.saveCheckpoint(checkpointDTO1)).thenReturn(ZIO.unit) when(checkpointServiceMock.saveCheckpoint(checkpointDTO2)) - .thenReturn(ZIO.fail(ServiceError("error in data"))) + .thenReturn(ZIO.fail(GeneralServiceError("error in data"))) when(checkpointServiceMock.saveCheckpoint(checkpointDTO3)) - .thenReturn(ZIO.fail(ServiceError("boom!"))) + .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) + + private val partitioningId = 1L + + when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO1)).thenReturn(ZIO.unit) + when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO2)) + .thenReturn(ZIO.fail(GeneralServiceError("error in data"))) + when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO3)) + .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) private val checkpointServiceMockLayer = ZLayer.succeed(checkpointServiceMock) @@ -46,15 +57,36 @@ object CheckpointControllerUnitTests extends ZIOSpecDefault with TestData { result <- CheckpointController.createCheckpointV1(checkpointDTO1) } yield assertTrue(result == checkpointDTO1) }, - test("Returns expected InternalServerErrorResponse") { - assertZIO(CheckpointController.createCheckpointV1(checkpointDTO3).exit)( + test("Returns expected ConflictServiceError") { + assertZIO(CheckpointController.createCheckpointV1(checkpointDTO2).exit)( failsWithA[InternalServerErrorResponse] ) }, - test("Returns expected GeneralErrorResponse") { - assertZIO(CheckpointController.createCheckpointV1(checkpointDTO2).exit)( + test("Returns expected ConflictServiceError") { + assertZIO(CheckpointController.createCheckpointV1(checkpointDTO3).exit)( + failsWithA[ConflictErrorResponse] + ) + } + ), + suite("PostCheckpointV2Suite")( + test("Returns expected CheckpointDTO") { + for { + result <- CheckpointController.postCheckpointV2(partitioningId, checkpointV2DTO1) + } yield assertTrue( + result._1.isInstanceOf[SingleSuccessResponse[CheckpointV2DTO]] + && result._1.data == checkpointV2DTO1 + && result._2 == s"/api/v2/partitionings/$partitioningId/checkpoints/${checkpointV2DTO1.id}" + ) + }, + test("Returns expected ConflictServiceError") { + assertZIO(CheckpointController.postCheckpointV2(1L, checkpointV2DTO2).exit)( failsWithA[InternalServerErrorResponse] ) + }, + test("Returns expected ConflictServiceError") { + assertZIO(CheckpointController.postCheckpointV2(1L, checkpointV2DTO3).exit)( + failsWithA[ConflictErrorResponse] + ) } ) ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala index 4cf152128..4d2f90fe3 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.FlowService import za.co.absa.atum.server.model.InternalServerErrorResponse import zio._ @@ -28,7 +28,7 @@ import zio.test._ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { private val flowServiceMock = mock(classOf[FlowService]) when(flowServiceMock.getFlowCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.fail(ServiceError("boom!"))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) when(flowServiceMock.getFlowCheckpoints(checkpointQueryDTO2)) .thenReturn(ZIO.succeed(Seq(checkpointDTO2))) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 5a78b5012..30ac2ab0b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.ServiceError.GeneralServiceError import za.co.absa.atum.server.api.service.PartitioningService import za.co.absa.atum.server.model.InternalServerErrorResponse import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse @@ -31,9 +31,9 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { private val partitioningServiceMock = mock(classOf[PartitioningService]) when(partitioningServiceMock.createPartitioningIfNotExists(partitioningSubmitDTO1)) - .thenReturn(ZIO.succeed(())) + .thenReturn(ZIO.unit) when(partitioningServiceMock.createPartitioningIfNotExists(partitioningSubmitDTO2)) - .thenReturn(ZIO.fail(ServiceError("boom!"))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) when(partitioningServiceMock.getPartitioningMeasures(partitioningDTO1)) .thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) @@ -42,16 +42,16 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { .thenReturn(ZIO.succeed(Map.empty)) when(partitioningServiceMock.createOrUpdateAdditionalData(additionalDataSubmitDTO1)) - .thenReturn(ZIO.succeed(())) + .thenReturn(ZIO.unit) when(partitioningServiceMock.createOrUpdateAdditionalData(additionalDataSubmitDTO2)) - .thenReturn(ZIO.fail(ServiceError("boom!"))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO1)) .thenReturn(ZIO.succeed(Seq(checkpointDTO1, checkpointDTO2))) when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO2)) .thenReturn(ZIO.succeed(Seq.empty)) when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO3)) - .thenReturn(ZIO.fail(ServiceError("boom!"))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala index 2f1b8ffe5..b5a93234f 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala @@ -22,7 +22,7 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException} import za.co.absa.db.fadb.status.FunctionStatus import zio._ import zio.interop.catz.asyncInstance @@ -37,7 +37,6 @@ object WriteCheckpointIntegrationTests extends ConfigProviderTest { suite("WriteCheckpointSuite")( test("Returns expected Left with DataNotFoundException as related partitioning is not in the database") { - val checkpointDTO = CheckpointDTO( id = UUID.randomUUID(), name = "name", @@ -52,7 +51,7 @@ object WriteCheckpointIntegrationTests extends ConfigProviderTest { for { writeCheckpoint <- ZIO.service[WriteCheckpoint] result <- writeCheckpoint(checkpointDTO) - } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + } yield assertTrue(result == Left(DataConflictException(FunctionStatus(32, "Partitioning not found")))) } ).provide( WriteCheckpoint.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala new file mode 100644 index 000000000..87a2ec89a --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.model.dto._ +import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs +import za.co.absa.db.fadb.exceptions.DataConflictException +import za.co.absa.db.fadb.status.{FunctionStatus, Row} +import zio._ +import zio.interop.catz.asyncInstance +import zio.test._ + +import java.time.ZonedDateTime +import java.util.UUID + +object WriteCheckpointV2IntegrationTests extends ConfigProviderTest { + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + suite("WriteCheckpointV2Suite")( + test("Returns expected Left with DataNotFoundException as related partitioning is not in the database") { + val checkpointV2DTO = CheckpointV2DTO( + id = UUID.randomUUID(), + name = "name", + author = "author", + processStartTime = ZonedDateTime.now(), + processEndTime = Option(ZonedDateTime.now()), + measurements = Set( + MeasurementDTO(MeasureDTO("count", Seq("*")), MeasureResultDTO(TypedValue("1", ResultValueType.LongValue))) + ) + ) + for { + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + result <- writeCheckpointV2(WriteCheckpointArgs(1L, checkpointV2DTO)) + } yield assertTrue(result == Left(DataConflictException(FunctionStatus(32, "Partitioning not found")))) + } + ).provide( + WriteCheckpointV2.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + } + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala deleted file mode 100644 index e8798974b..000000000 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.server.api.http - -import org.scalatest.flatspec.AnyFlatSpec - -class BaseEndpointsUnitTests extends AnyFlatSpec { - - object BaseEndpointsForTests extends BaseEndpoints - - "pathToAPIv1CompatibleFormat" should "successfully handle empty input" in { - val input = "" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "" - assert(actual == expected) - } - - "pathToAPIv1CompatibleFormat" should - "successfully convert our standard API path format to format compatible with API V1 (kebab)" in { - - val input = "create-checkpoint" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheckpoint" - assert(actual == expected) - } - - "pathToAPIv1CompatibleFormat" should - "successfully convert our standard API path format to format compatible with API V1 (kebab2)" in { - - val input = "create-check-point2" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheckPoint2" - assert(actual == expected) - } - - "pathToAPIv1CompatibleFormat" should - "successfully convert our standard API path format to format compatible with API V1 (kebab3)" in { - - val input = "Create-check-" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheck" - assert(actual == expected) - } - - "pathToAPIv1CompatibleFormat" should - "successfully convert our standard API path format to format compatible with API V1 (snake)" in { - - val input = "_create_check_point" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "CreateCheckPoint" - assert(actual == expected) - } - - "pathToAPIv1CompatibleFormat" should - "successfully convert our standard API path format to format compatible with API V1 (kebab and snake)" in { - - val input = "Create-check_Point" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheckPoint" - assert(actual == expected) - } - - "pathToAPIv1CompatibleFormat" should - "successfully convert our standard API path format to format compatible with API V1 (one word)" in { - - val input = "createcheckpoint" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createcheckpoint" - assert(actual == expected) - } -} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/PostCheckpointEndpointV2UnitTests.scala similarity index 59% rename from server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/http/PostCheckpointEndpointV2UnitTests.scala index 0621d7ca4..760a1081e 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/PostCheckpointEndpointV2UnitTests.scala @@ -23,55 +23,62 @@ import sttp.client3.circe._ import sttp.model.StatusCode import sttp.tapir.server.stub.TapirStubInterpreter import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} -import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.controller.CheckpointController -import za.co.absa.atum.server.model.{GeneralErrorResponse, InternalServerErrorResponse} +import za.co.absa.atum.server.model.{ConflictErrorResponse, GeneralErrorResponse, InternalServerErrorResponse} import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import zio._ import zio.test.Assertion.equalTo import zio.test._ -object CreateCheckpointEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { +object PostCheckpointEndpointV2UnitTests extends ZIOSpecDefault with Endpoints with TestData { private val checkpointControllerMock = mock(classOf[CheckpointController]) - when(checkpointControllerMock.createCheckpointV2(checkpointDTO1)) - .thenReturn(ZIO.succeed(SingleSuccessResponse(checkpointDTO1, uuid))) - when(checkpointControllerMock.createCheckpointV2(checkpointDTO2)) + when(checkpointControllerMock.postCheckpointV2(1L, checkpointV2DTO1)) + .thenReturn(ZIO.succeed((SingleSuccessResponse(checkpointV2DTO1, uuid), "some location"))) + when(checkpointControllerMock.postCheckpointV2(1L, checkpointV2DTO2)) .thenReturn(ZIO.fail(GeneralErrorResponse("error"))) - when(checkpointControllerMock.createCheckpointV2(checkpointDTO3)) + when(checkpointControllerMock.postCheckpointV2(1L, checkpointV2DTO3)) .thenReturn(ZIO.fail(InternalServerErrorResponse("error"))) + when(checkpointControllerMock.postCheckpointV2(1L, checkpointV2DTO4)) + .thenReturn(ZIO.fail(ConflictErrorResponse("error"))) private val checkpointControllerMockLayer = ZLayer.succeed(checkpointControllerMock) - private val createCheckpointServerEndpoint = createCheckpointEndpointV2 - .zServerLogic(CheckpointController.createCheckpointV2) + private val postCheckpointServerEndpointV2 = postCheckpointEndpointV2 + .zServerLogic({ case (partitioningId: Long, checkpointV2DTO: CheckpointV2DTO) => + CheckpointController.postCheckpointV2(partitioningId, checkpointV2DTO) + }) def spec: Spec[TestEnvironment with Scope, Any] = { val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[CheckpointController])) - .whenServerEndpoint(createCheckpointServerEndpoint) + .whenServerEndpoint(postCheckpointServerEndpointV2) .thenRunLogic() .backend() val request = basicRequest - .post(uri"https://test.com/api/v2/create-checkpoint") - .response(asJson[SingleSuccessResponse[CheckpointDTO]]) + .post(uri"https://test.com/api/v2/partitionings/1/checkpoints") + .response(asJson[SingleSuccessResponse[CheckpointV2DTO]]) suite("CreateCheckpointEndpointSuite")( test("Returns expected CheckpointDTO") { val response = request - .body(checkpointDTO1) + .body(checkpointV2DTO1) .send(backendStub) val body = response.map(_.body) val statusCode = response.map(_.code) + val header = response.map(_.header("Location")) - assertZIO(body <&> statusCode)(equalTo(Right(SingleSuccessResponse(checkpointDTO1, uuid)), StatusCode.Created)) + assertZIO(body <&> statusCode <&> header)( + equalTo(Right(SingleSuccessResponse(checkpointV2DTO1, uuid)), StatusCode.Created, Some("some location")) + ) }, test("Returns expected BadRequest") { val response = request - .body(checkpointDTO2) + .body(checkpointV2DTO2) .send(backendStub) val statusCode = response.map(_.code) @@ -80,12 +87,21 @@ object CreateCheckpointEndpointUnitTests extends ZIOSpecDefault with Endpoints w }, test("Returns expected InternalServerError") { val response = request - .body(checkpointDTO3) + .body(checkpointV2DTO3) .send(backendStub) val statusCode = response.map(_.code) assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + }, + test("Returns expected ConflictError") { + val response = request + .body(checkpointV2DTO4) + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.Conflict)) } ) }.provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index e7af6ecf1..407f5607b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -17,28 +17,40 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData -import za.co.absa.db.fadb.exceptions.ErrorInDataException +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs +import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.db.fadb.exceptions.DataConflictException import za.co.absa.db.fadb.status.FunctionStatus import zio._ import zio.interop.catz.asyncInstance -import zio.test.Assertion.{failsWithA, isUnit} +import zio.test.Assertion.failsWithA import zio.test._ - import za.co.absa.db.fadb.status.Row object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) + private val writeCheckpointV2Mock: WriteCheckpointV2 = mock(classOf[WriteCheckpointV2]) when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(writeCheckpointMock.apply(checkpointDTO2)) - .thenReturn(ZIO.fail(DatabaseError("Operation 'writeCheckpoint' failed with unexpected error: null"))) + .thenReturn(ZIO.fail(DataConflictException(FunctionStatus(31, "conflict")))) when(writeCheckpointMock.apply(checkpointDTO3)).thenReturn(ZIO.fail(new Exception("boom!"))) + private val partitioningId = 1L + + when(writeCheckpointV2Mock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO1))) + .thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) + when(writeCheckpointV2Mock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO2))) + .thenReturn(ZIO.left(DataConflictException(FunctionStatus(32, "Partitioning not found")))) + when(writeCheckpointV2Mock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO3))) + .thenReturn(ZIO.fail(new Exception("boom!"))) + private val writeCheckpointMockLayer = ZLayer.succeed(writeCheckpointMock) + private val writeCheckpointV2MockLayer = ZLayer.succeed(writeCheckpointV2Mock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -50,17 +62,32 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { - for { - result <- CheckpointRepository.writeCheckpoint(checkpointDTO2).exit - } yield assertTrue( - result == Exit.fail(DatabaseError("Operation 'writeCheckpoint' failed with unexpected error: null")) + assertZIO(CheckpointRepository.writeCheckpoint(checkpointDTO2).exit)( + failsWithA[ConflictDatabaseError] ) }, test("Returns expected DatabaseError") { assertZIO(CheckpointRepository.writeCheckpoint(checkpointDTO3).exit)(failsWithA[DatabaseError]) } + ), + suite("WriteCheckpointV2Suite")( + test("Returns an expected Unit") { + for { + result <- CheckpointRepository.writeCheckpointV2(partitioningId, checkpointV2DTO1) + } yield assertTrue(result == ()) + }, + test("Fails with an expected ConflictDatabaseError") { + assertZIO(CheckpointRepository.writeCheckpointV2(partitioningId, checkpointV2DTO2).exit)( + failsWithA[ConflictDatabaseError] + ) + }, + test("Fails with an expected GeneralDatabaseError") { + assertZIO(CheckpointRepository.writeCheckpointV2(partitioningId, checkpointV2DTO3).exit)( + failsWithA[GeneralDatabaseError] + ) + } ) - ).provide(CheckpointRepositoryImpl.layer, writeCheckpointMockLayer) + ).provide(CheckpointRepositoryImpl.layer, writeCheckpointMockLayer, writeCheckpointV2MockLayer) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 1c054613f..c1174e3a2 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -20,6 +20,7 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.db.fadb.exceptions.ErrorInDataException import za.co.absa.db.fadb.status.{FunctionStatus, Row} import zio._ @@ -63,7 +64,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { Seq(Row(FunctionStatus(0, "success"), measureFromDB1), Row(FunctionStatus(0, "success"), measureFromDB2)) ) ) - when(getPartitioningMeasuresMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(getPartitioningMeasuresMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) private val getPartitioningMeasuresMockLayer = ZLayer.succeed(getPartitioningMeasuresMock) @@ -72,7 +73,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { when(getPartitioningAdditionalDataMock.apply(partitioningDTO1)) .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), AdditionalDataFromDB(Some("key"), Some("value")))))) - when(getPartitioningAdditionalDataMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(getPartitioningAdditionalDataMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) @@ -82,7 +83,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)) .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1)))) when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.right(Seq.empty)) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) @@ -100,7 +101,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { result <- PartitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO2).exit } yield assertTrue( result == Exit.fail( - DatabaseError( + GeneralDatabaseError( "Exception caused by operation: 'createPartitioningIfNotExists': (50) error in Partitioning data" ) ) @@ -123,7 +124,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { result <- PartitioningRepository.createOrUpdateAdditionalData(additionalDataSubmitDTO2).exit } yield assertTrue( result == Exit.fail( - DatabaseError("Exception caused by operation: 'createOrUpdateAdditionalData': (50) error in AD data") + GeneralDatabaseError("Exception caused by operation: 'createOrUpdateAdditionalData': (50) error in AD data") ) ) }, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala index f73c21c8c..57a706cfd 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala @@ -18,7 +18,8 @@ package za.co.absa.atum.server.api.service import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} +import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.CheckpointRepository import zio.test.Assertion.failsWithA import zio.test._ @@ -28,9 +29,18 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { private val checkpointRepositoryMock = mock(classOf[CheckpointRepository]) - when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO1)).thenReturn(ZIO.succeed(())) - when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO2)).thenReturn(ZIO.fail(DatabaseError("error in data"))) - when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO3)).thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO1)).thenReturn(ZIO.unit) + when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO2)) + .thenReturn(ZIO.fail(ConflictDatabaseError("error in data"))) + when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO3)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + + private val partitioningId = 1L + + when(checkpointRepositoryMock.writeCheckpointV2(partitioningId, checkpointV2DTO1)).thenReturn(ZIO.unit) + when(checkpointRepositoryMock.writeCheckpointV2(partitioningId, checkpointV2DTO2)) + .thenReturn(ZIO.fail(ConflictDatabaseError("conflict in data"))) + when(checkpointRepositoryMock.writeCheckpointV2(partitioningId, checkpointV2DTO3)) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) private val checkpointRepositoryMockLayer = ZLayer.succeed(checkpointRepositoryMock) @@ -46,10 +56,29 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Left with StatusException") { for { result <- CheckpointService.saveCheckpoint(checkpointDTO2).exit - } yield assertTrue(result == Exit.fail(ServiceError("Failed to perform 'saveCheckpoint': error in data"))) + } yield assertTrue( + result == Exit.fail(ConflictServiceError("Failed to perform 'saveCheckpoint': error in data")) + ) }, test("Returns expected ServiceError") { - assertZIO(CheckpointService.saveCheckpoint(checkpointDTO3).exit)(failsWithA[ServiceError]) + assertZIO(CheckpointService.saveCheckpoint(checkpointDTO3).exit)(failsWithA[GeneralServiceError]) + } + ), + suite("SaveCheckpointV2Suite")( + test("Returns an expected Unit") { + for { + result <- CheckpointService.saveCheckpointV2(partitioningId, checkpointV2DTO1) + } yield assertTrue(result == ()) + }, + test("Fails with an expected ConflictServiceError") { + assertZIO(CheckpointService.saveCheckpointV2(partitioningId, checkpointV2DTO2).exit)( + failsWithA[ConflictServiceError] + ) + }, + test("Fails with an expected GeneralServiceError") { + assertZIO(CheckpointService.saveCheckpointV2(partitioningId, checkpointV2DTO3).exit)( + failsWithA[GeneralServiceError] + ) } ) ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala index da1bfba87..a3c856fec 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala @@ -18,7 +18,8 @@ package za.co.absa.atum.server.api.service import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} +import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError +import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.FlowRepository import zio._ import zio.test.Assertion.failsWithA @@ -27,7 +28,7 @@ import zio.test._ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { private val flowRepositoryMock = mock(classOf[FlowRepository]) - when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO1)).thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO1)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO2)) .thenReturn(ZIO.succeed(Seq(checkpointFromDB2))) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index 34da663c0..0c97e1d7e 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -18,7 +18,9 @@ package za.co.absa.atum.server.api.service import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} +import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository import zio.test.Assertion.failsWithA import zio.test._ @@ -28,32 +30,32 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { private val partitioningRepositoryMock = mock(classOf[PartitioningRepository]) - when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO1)).thenReturn(ZIO.succeed(())) + when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO1)).thenReturn(ZIO.unit) when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO2)) - .thenReturn(ZIO.fail(DatabaseError("error in data"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("error in data"))) when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO3)) - .thenReturn(ZIO.fail(DatabaseError("boom!"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO1)).thenReturn(ZIO.succeed(())) + when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO1)).thenReturn(ZIO.unit) when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO2)) - .thenReturn(ZIO.fail(DatabaseError("error in AD data"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("error in AD data"))) when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO3)) - .thenReturn(ZIO.fail(DatabaseError("boom!"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) when(partitioningRepositoryMock.getPartitioningMeasures(partitioningDTO1)) .thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) when(partitioningRepositoryMock.getPartitioningMeasures(partitioningDTO2)) - .thenReturn(ZIO.fail(DatabaseError("boom!"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) when(partitioningRepositoryMock.getPartitioningAdditionalData(partitioningDTO1)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningRepositoryMock.getPartitioningAdditionalData(partitioningDTO2)) - .thenReturn(ZIO.fail(DatabaseError("boom!"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO1)) .thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2))) when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.fail(DatabaseError("boom!"))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) @@ -70,7 +72,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { for { result <- PartitioningService.createPartitioningIfNotExists(partitioningSubmitDTO2).exit } yield assertTrue( - result == Exit.fail(ServiceError("Failed to perform 'createPartitioningIfNotExists': error in data")) + result == Exit.fail(GeneralServiceError("Failed to perform 'createPartitioningIfNotExists': error in data")) ) }, test("Returns expected ServiceError") { @@ -89,7 +91,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { for { result <- PartitioningService.createOrUpdateAdditionalData(additionalDataSubmitDTO2).exit } yield assertTrue( - result == Exit.fail(ServiceError("Failed to perform 'createOrUpdateAdditionalData': error in AD data")) + result == Exit.fail(GeneralServiceError("Failed to perform 'createOrUpdateAdditionalData': error in AD data")) ) }, test("Returns expected ServiceError") {