From 867cf408d0109ef033170935498255548c1b453b Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 20 Aug 2024 10:24:32 +0200 Subject: [PATCH] overloaded sql code --- ...t_v1.sql => V1.5.10__write_checkpoint.sql} | 39 ++++---- .../WriteCheckpointIntegrationTests.scala | 81 ++++++++-------- ...heckpointOverloadedIntegrationTests.scala} | 93 ++++++++++--------- .../scala/za/co/absa/atum/server/Main.scala | 2 +- .../runs/functions/WriteCheckpoint.scala | 27 +++--- ...kpointV1.scala => WriteCheckpointV2.scala} | 38 ++++---- .../repository/CheckpointRepositoryImpl.scala | 16 ++-- .../WriteCheckpointIntegrationTests.scala | 16 ++-- ...> WriteCheckpointV2IntegrationTests.scala} | 20 ++-- .../CheckpointRepositoryUnitTests.scala | 22 ++--- 10 files changed, 180 insertions(+), 174 deletions(-) rename database/src/main/postgres/runs/{V1.5.10__write_checkpoint_v1.sql => V1.5.10__write_checkpoint.sql} (76%) rename database/src/test/scala/za/co/absa/atum/database/runs/{WriteCheckpointV1IntegrationTests.scala => WriteCheckpointOverloadedIntegrationTests.scala} (88%) rename server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/{WriteCheckpointV1.scala => WriteCheckpointV2.scala} (58%) rename server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/{WriteCheckpointV1IntegrationTests.scala => WriteCheckpointV2IntegrationTests.scala} (76%) diff --git a/database/src/main/postgres/runs/V1.5.10__write_checkpoint_v1.sql b/database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql similarity index 76% rename from database/src/main/postgres/runs/V1.5.10__write_checkpoint_v1.sql rename to database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql index 1d2e3c149..d5983d6a2 100644 --- a/database/src/main/postgres/runs/V1.5.10__write_checkpoint_v1.sql +++ b/database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql @@ -14,7 +14,7 @@ */ -CREATE OR REPLACE FUNCTION runs.write_checkpoint_v1( +CREATE OR REPLACE FUNCTION runs.write_checkpoint( IN i_partitioning JSONB, IN i_id_checkpoint UUID, IN i_checkpoint_name TEXT, @@ -29,7 +29,7 @@ CREATE OR REPLACE FUNCTION runs.write_checkpoint_v1( $$ ------------------------------------------------------------------------------- -- --- Function: runs.write_checkpoint_v1(10) +-- Function: runs.write_checkpoint(10) -- Creates a checkpoint and adds all the measurements that it consists of -- -- Parameters: @@ -63,28 +63,33 @@ $$ ------------------------------------------------------------------------------- DECLARE _fk_partitioning BIGINT; - result RECORD; BEGIN _fk_partitioning = runs._get_id_partitioning(i_partitioning); - result = runs.write_checkpoint( - _fk_partitioning, - i_id_checkpoint, - i_checkpoint_name, - i_process_start_time, - i_process_end_time, - i_measurements, - i_measured_by_atum_agent, - i_by_user - ); + IF _fk_partitioning IS NULL THEN + status := 32; + status_text := 'Partitioning not found'; + RETURN; + END IF; + + SELECT WC.status, WC.status_text + FROM runs.write_checkpoint( + _fk_partitioning, + i_id_checkpoint, + i_checkpoint_name, + i_process_start_time, + i_process_end_time, + i_measurements, + i_measured_by_atum_agent, + i_by_user + ) WC + INTO status, status_text; - status := result.status; - status_text := result.status_text; RETURN; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; -ALTER FUNCTION runs.write_checkpoint_v1(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner; -GRANT EXECUTE ON FUNCTION runs.write_checkpoint_v1(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user; +ALTER FUNCTION runs.write_checkpoint(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.write_checkpoint(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala index c816d25ac..51631bfaa 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointIntegrationTests.scala @@ -42,42 +42,6 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { |""".stripMargin ) - private val measurements = - """ - |{ - | "{ - | \"measure\": { - | \"measureName\": \"count\", - | \"measuredColumns\": [] - | }, - | \"result\":{ - | \"value\":\"3\", - | \"type\":\"int\" - | } - | }", - | "{ - | \"measure\": { - | \"measureName\": \"avg\", - | \"measuredColumns\": [\"col1\"] - | }, - | \"result\":{ - | \"value\":\"3.14\", - | \"type\":\"double\" - | } - | }", - | "{ - | \"measure\": { - | \"measureName\": \"avg\", - | \"measuredColumns\": [\"a\",\"b\"] - | }, - | \"result\":{ - | \"value\":\"2.71\", - | \"type\":\"double\" - | } - | }" - |} - |""".stripMargin - test("Write new checkpoint without data") { val uuid = UUID.randomUUID val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") @@ -101,7 +65,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) function(fnWriteCheckpoint) - .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Empty path") .setParam("i_process_start_time", startTime) @@ -136,7 +100,41 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { val user = "Franz Kafka" val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") - + val measurements = + """ + |{ + | "{ + | \"measure\": { + | \"measureName\": \"count\", + | \"measuredColumns\": [] + | }, + | \"result\":{ + | \"value\":\"3\", + | \"type\":\"int\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"col1\"] + | }, + | \"result\":{ + | \"value\":\"3.14\", + | \"type\":\"double\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"a\",\"b\"] + | }, + | \"result\":{ + | \"value\":\"2.71\", + | \"type\":\"double\" + | } + | }" + |} + |""".stripMargin table("runs.partitionings").insert( add("partitioning", partitioning) @@ -155,7 +153,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) function(fnWriteCheckpoint) - .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Happy path") .setParam("i_process_start_time", startTime) @@ -242,7 +240,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { ) function(fnWriteCheckpoint) - .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Won't go in") .setParam("i_process_start_time", now()) @@ -269,7 +267,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { val uuid = UUID.randomUUID val count = table("runs.checkpoints").count() function(fnWriteCheckpoint) - .setParam("i_partitioning_id", 0L) + .setParam("i_partitioning", partitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Won't go in") .setParam("i_process_start_time", now()) @@ -285,5 +283,4 @@ class WriteCheckpointIntegrationTests extends DBTestSuite { } assert(table("runs.checkpoints").count() == count) } - } diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointV1IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala similarity index 88% rename from database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointV1IntegrationTests.scala rename to database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala index 6d26748b0..15f339ce3 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointV1IntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointOverloadedIntegrationTests.scala @@ -23,9 +23,9 @@ import za.co.absa.balta.classes.setter.CustomDBType import java.time.OffsetDateTime import java.util.UUID -class WriteCheckpointV1IntegrationTests extends DBTestSuite { +class WriteCheckpointOverloadedIntegrationTests extends DBTestSuite { - private val fnWriteCheckpointV1 = "runs.write_checkpoint_v1" + private val fnWriteCheckpoint = "runs.write_checkpoint" private val partitioning = JsonBString( """ @@ -42,6 +42,42 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { |""".stripMargin ) + private val measurements = + """ + |{ + | "{ + | \"measure\": { + | \"measureName\": \"count\", + | \"measuredColumns\": [] + | }, + | \"result\":{ + | \"value\":\"3\", + | \"type\":\"int\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"col1\"] + | }, + | \"result\":{ + | \"value\":\"3.14\", + | \"type\":\"double\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"a\",\"b\"] + | }, + | \"result\":{ + | \"value\":\"2.71\", + | \"type\":\"double\" + | } + | }" + |} + |""".stripMargin + test("Write new checkpoint without data") { val uuid = UUID.randomUUID val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") @@ -64,8 +100,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) - function(fnWriteCheckpointV1) - .setParam("i_partitioning", partitioning) + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", fkPartitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Empty path") .setParam("i_process_start_time", startTime) @@ -100,41 +136,7 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { val user = "Franz Kafka" val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") - val measurements = - """ - |{ - | "{ - | \"measure\": { - | \"measureName\": \"count\", - | \"measuredColumns\": [] - | }, - | \"result\":{ - | \"value\":\"3\", - | \"type\":\"int\" - | } - | }", - | "{ - | \"measure\": { - | \"measureName\": \"avg\", - | \"measuredColumns\": [\"col1\"] - | }, - | \"result\":{ - | \"value\":\"3.14\", - | \"type\":\"double\" - | } - | }", - | "{ - | \"measure\": { - | \"measureName\": \"avg\", - | \"measuredColumns\": [\"a\",\"b\"] - | }, - | \"result\":{ - | \"value\":\"2.71\", - | \"type\":\"double\" - | } - | }" - |} - |""".stripMargin + table("runs.partitionings").insert( add("partitioning", partitioning) @@ -152,8 +154,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) - function(fnWriteCheckpointV1) - .setParam("i_partitioning", partitioning) + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", fkPartitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Happy path") .setParam("i_process_start_time", startTime) @@ -239,8 +241,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { .add("created_by", origAuthor) ) - function(fnWriteCheckpointV1) - .setParam("i_partitioning", partitioning) + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", fkPartitioning) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Won't go in") .setParam("i_process_start_time", now()) @@ -266,8 +268,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { test("Partitioning of the checkpoint does not exist") { val uuid = UUID.randomUUID val count = table("runs.checkpoints").count() - function(fnWriteCheckpointV1) - .setParam("i_partitioning", partitioning) + function(fnWriteCheckpoint) + .setParam("i_partitioning_id", 0L) .setParam("i_id_checkpoint", uuid) .setParam("i_checkpoint_name", "Won't go in") .setParam("i_process_start_time", now()) @@ -283,4 +285,5 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite { } assert(table("runs.checkpoints").count() == count) } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 146502c9b..5a07279b6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -55,8 +55,8 @@ object Main extends ZIOAppDefault with Server { GetPartitioningAdditionalData.layer, CreateOrUpdateAdditionalData.layer, GetPartitioningCheckpoints.layer, - WriteCheckpointV1.layer, WriteCheckpoint.layer, + WriteCheckpointV2.layer, GetFlowCheckpoints.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala index 3eb2ff5a2..ce6c5c129 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala @@ -17,6 +17,8 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.implicits.toSqlInterpolator +import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.server.model.PartitioningForDB import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus @@ -25,29 +27,28 @@ import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import zio._ import io.circe.syntax._ +import za.co.absa.atum.model.dto.MeasureResultDTO._ +import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut import doobie.postgres.implicits._ -import za.co.absa.atum.model.dto.CheckpointV2DTO -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint.WriteCheckpointArgs class WriteCheckpoint(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieSingleResultFunctionWithStatus[WriteCheckpointArgs, Unit, Task](args => + extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task](values => Seq( - fr"${args.partitioningId}", - fr"${args.checkpointV2DTO.id}", - fr"${args.checkpointV2DTO.name}", - fr"${args.checkpointV2DTO.processStartTime}", - fr"${args.checkpointV2DTO.processEndTime}", - fr"${args.checkpointV2DTO.measurements.toList.map(_.asJson)}", - fr"${args.checkpointV2DTO.measuredByAtumAgent}", - fr"${args.checkpointV2DTO.author}" + fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", + fr"${values.id}", + fr"${values.name}", + fr"${values.processStartTime}", + fr"${values.processEndTime}", + fr"${values.measurements.toList.map(_.asJson)}", + fr"${values.measuredByAtumAgent}", + fr"${values.author}" ) ) with StandardStatusHandling object WriteCheckpoint { - case class WriteCheckpointArgs(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO) - val layer: URLayer[PostgresDatabaseProvider, WriteCheckpoint] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV1.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala similarity index 58% rename from server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV1.scala rename to server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala index 00570927c..ff797dd70 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV1.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala @@ -17,8 +17,6 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.implicits.toSqlInterpolator -import za.co.absa.atum.model.dto.CheckpointDTO -import za.co.absa.atum.server.model.PartitioningForDB import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus @@ -27,31 +25,33 @@ import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import zio._ import io.circe.syntax._ -import za.co.absa.atum.model.dto.MeasureResultDTO._ -import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get -import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut import doobie.postgres.implicits._ +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs -class WriteCheckpointV1(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task](values => +class WriteCheckpointV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieSingleResultFunctionWithStatus[WriteCheckpointArgs, Unit, Task](args => Seq( - fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", - fr"${values.id}", - fr"${values.name}", - fr"${values.processStartTime}", - fr"${values.processEndTime}", - fr"${values.measurements.toList.map(_.asJson)}", - fr"${values.measuredByAtumAgent}", - fr"${values.author}" - ) + fr"${args.partitioningId}", + fr"${args.checkpointV2DTO.id}", + fr"${args.checkpointV2DTO.name}", + fr"${args.checkpointV2DTO.processStartTime}", + fr"${args.checkpointV2DTO.processEndTime}", + fr"${args.checkpointV2DTO.measurements.toList.map(_.asJson)}", + fr"${args.checkpointV2DTO.measuredByAtumAgent}", + fr"${args.checkpointV2DTO.author}" + ), + Some("write_checkpoint") ) with StandardStatusHandling -object WriteCheckpointV1 { - val layer: URLayer[PostgresDatabaseProvider, WriteCheckpointV1] = ZLayer { +object WriteCheckpointV2 { + case class WriteCheckpointArgs(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO) + + val layer: URLayer[PostgresDatabaseProvider, WriteCheckpointV2] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] - } yield new WriteCheckpointV1()(Runs, dbProvider.dbEngine) + } yield new WriteCheckpointV2()(Runs, dbProvider.dbEngine) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index bfdf5aba9..0fd132a4d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -17,33 +17,33 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint.WriteCheckpointArgs -import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV1} +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} import za.co.absa.atum.server.api.exception.DatabaseError import zio._ import zio.interop.catz.asyncInstance -class CheckpointRepositoryImpl(writeCheckpointV1Fn: WriteCheckpointV1, writeCheckpointFn: WriteCheckpoint) +class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint, writeCheckpointV2Fn: WriteCheckpointV2) extends CheckpointRepository with BaseRepository { override def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] = { - dbSingleResultCallWithStatus(writeCheckpointV1Fn(checkpointDTO), "writeCheckpoint") + dbSingleResultCallWithStatus(writeCheckpointFn(checkpointDTO), "writeCheckpoint") } override def writeCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[DatabaseError, Unit] = { dbSingleResultCallWithStatus( - writeCheckpointFn(WriteCheckpointArgs(partitioningId, checkpointV2DTO)), + writeCheckpointV2Fn(WriteCheckpointArgs(partitioningId, checkpointV2DTO)), "writeCheckpoint" ) } } object CheckpointRepositoryImpl { - val layer: URLayer[WriteCheckpointV1 with WriteCheckpoint, CheckpointRepository] = ZLayer { + val layer: URLayer[WriteCheckpoint with WriteCheckpointV2, CheckpointRepository] = ZLayer { for { - writeCheckpointV1 <- ZIO.service[WriteCheckpointV1] writeCheckpoint <- ZIO.service[WriteCheckpoint] - } yield new CheckpointRepositoryImpl(writeCheckpointV1, writeCheckpoint) + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + } yield new CheckpointRepositoryImpl(writeCheckpoint, writeCheckpointV2) } } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala index 39662ac48..b5a93234f 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala @@ -17,14 +17,13 @@ package za.co.absa.atum.server.api.database.runs.functions import za.co.absa.atum.model.ResultValueType -import za.co.absa.atum.model.dto._ +import za.co.absa.atum.model.dto.{CheckpointDTO, MeasureDTO, MeasureResultDTO, MeasurementDTO, PartitionDTO} import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint.WriteCheckpointArgs -import za.co.absa.db.fadb.exceptions.DataConflictException -import za.co.absa.db.fadb.status.{FunctionStatus, Row} +import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException} +import za.co.absa.db.fadb.status.FunctionStatus import zio._ import zio.interop.catz.asyncInstance import zio.test._ @@ -36,12 +35,13 @@ object WriteCheckpointIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { - suite("WriteCheckpointV2Suite")( + suite("WriteCheckpointSuite")( test("Returns expected Left with DataNotFoundException as related partitioning is not in the database") { - val checkpointV2DTO = CheckpointV2DTO( + val checkpointDTO = CheckpointDTO( id = UUID.randomUUID(), name = "name", author = "author", + partitioning = Seq(PartitionDTO("key4", "value4")), processStartTime = ZonedDateTime.now(), processEndTime = Option(ZonedDateTime.now()), measurements = Set( @@ -49,8 +49,8 @@ object WriteCheckpointIntegrationTests extends ConfigProviderTest { ) ) for { - writeCheckpointV2 <- ZIO.service[WriteCheckpoint] - result <- writeCheckpointV2(WriteCheckpointArgs(1L, checkpointV2DTO)) + writeCheckpoint <- ZIO.service[WriteCheckpoint] + result <- writeCheckpoint(checkpointDTO) } yield assertTrue(result == Left(DataConflictException(FunctionStatus(32, "Partitioning not found")))) } ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV1IntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala similarity index 76% rename from server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV1IntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala index 988074be9..87a2ec89a 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV1IntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2IntegrationTests.scala @@ -17,13 +17,14 @@ package za.co.absa.atum.server.api.database.runs.functions import za.co.absa.atum.model.ResultValueType -import za.co.absa.atum.model.dto.{CheckpointDTO, MeasureDTO, MeasureResultDTO, MeasurementDTO, PartitionDTO} +import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException} -import za.co.absa.db.fadb.status.FunctionStatus +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs +import za.co.absa.db.fadb.exceptions.DataConflictException +import za.co.absa.db.fadb.status.{FunctionStatus, Row} import zio._ import zio.interop.catz.asyncInstance import zio.test._ @@ -31,17 +32,16 @@ import zio.test._ import java.time.ZonedDateTime import java.util.UUID -object WriteCheckpointV1IntegrationTests extends ConfigProviderTest { +object WriteCheckpointV2IntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { - suite("WriteCheckpointSuite")( + suite("WriteCheckpointV2Suite")( test("Returns expected Left with DataNotFoundException as related partitioning is not in the database") { - val checkpointDTO = CheckpointDTO( + val checkpointV2DTO = CheckpointV2DTO( id = UUID.randomUUID(), name = "name", author = "author", - partitioning = Seq(PartitionDTO("key4", "value4")), processStartTime = ZonedDateTime.now(), processEndTime = Option(ZonedDateTime.now()), measurements = Set( @@ -49,12 +49,12 @@ object WriteCheckpointV1IntegrationTests extends ConfigProviderTest { ) ) for { - writeCheckpoint <- ZIO.service[WriteCheckpointV1] - result <- writeCheckpoint(checkpointDTO) + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + result <- writeCheckpointV2(WriteCheckpointArgs(1L, checkpointV2DTO)) } yield assertTrue(result == Left(DataConflictException(FunctionStatus(32, "Partitioning not found")))) } ).provide( - WriteCheckpointV1.layer, + WriteCheckpointV2.layer, PostgresDatabaseProvider.layer, TestTransactorProvider.layerWithRollback ) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index bac3a078b..407f5607b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -17,10 +17,10 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV1} +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint.WriteCheckpointArgs +import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.db.fadb.exceptions.DataConflictException import za.co.absa.db.fadb.status.FunctionStatus @@ -32,25 +32,25 @@ import za.co.absa.db.fadb.status.Row object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { - private val writeCheckpointV1Mock: WriteCheckpointV1 = mock(classOf[WriteCheckpointV1]) private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) + private val writeCheckpointV2Mock: WriteCheckpointV2 = mock(classOf[WriteCheckpointV2]) - when(writeCheckpointV1Mock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) - when(writeCheckpointV1Mock.apply(checkpointDTO2)) + when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) + when(writeCheckpointMock.apply(checkpointDTO2)) .thenReturn(ZIO.fail(DataConflictException(FunctionStatus(31, "conflict")))) - when(writeCheckpointV1Mock.apply(checkpointDTO3)).thenReturn(ZIO.fail(new Exception("boom!"))) + when(writeCheckpointMock.apply(checkpointDTO3)).thenReturn(ZIO.fail(new Exception("boom!"))) private val partitioningId = 1L - when(writeCheckpointMock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO1))) + when(writeCheckpointV2Mock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO1))) .thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) - when(writeCheckpointMock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO2))) + when(writeCheckpointV2Mock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO2))) .thenReturn(ZIO.left(DataConflictException(FunctionStatus(32, "Partitioning not found")))) - when(writeCheckpointMock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO3))) + when(writeCheckpointV2Mock.apply(WriteCheckpointArgs(partitioningId, checkpointV2DTO3))) .thenReturn(ZIO.fail(new Exception("boom!"))) - private val writeCheckpointV1MockLayer = ZLayer.succeed(writeCheckpointV1Mock) private val writeCheckpointMockLayer = ZLayer.succeed(writeCheckpointMock) + private val writeCheckpointV2MockLayer = ZLayer.succeed(writeCheckpointV2Mock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -87,7 +87,7 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ) - ).provide(CheckpointRepositoryImpl.layer, writeCheckpointV1MockLayer, writeCheckpointMockLayer) + ).provide(CheckpointRepositoryImpl.layer, writeCheckpointMockLayer, writeCheckpointV2MockLayer) }