Skip to content

Commit

Permalink
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
Browse files Browse the repository at this point in the history
…1-get-partitioning-checkpoint-232-based

# Conflicts:
#	server/src/main/scala/za/co/absa/atum/server/Main.scala
#	server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala
#	server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala
#	server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala
  • Loading branch information
salamonpavel committed Aug 20, 2024
2 parents a6ada8d + 867cf40 commit ab7607e
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/


CREATE OR REPLACE FUNCTION runs.write_checkpoint_v1(
CREATE OR REPLACE FUNCTION runs.write_checkpoint(
IN i_partitioning JSONB,
IN i_id_checkpoint UUID,
IN i_checkpoint_name TEXT,
Expand All @@ -29,7 +29,7 @@ CREATE OR REPLACE FUNCTION runs.write_checkpoint_v1(
$$
-------------------------------------------------------------------------------
--
-- Function: runs.write_checkpoint_v1(10)
-- Function: runs.write_checkpoint(10)
-- Creates a checkpoint and adds all the measurements that it consists of
--
-- Parameters:
Expand Down Expand Up @@ -63,28 +63,33 @@ $$
-------------------------------------------------------------------------------
DECLARE
_fk_partitioning BIGINT;
result RECORD;
BEGIN

_fk_partitioning = runs._get_id_partitioning(i_partitioning);

result = runs.write_checkpoint(
_fk_partitioning,
i_id_checkpoint,
i_checkpoint_name,
i_process_start_time,
i_process_end_time,
i_measurements,
i_measured_by_atum_agent,
i_by_user
);
IF _fk_partitioning IS NULL THEN
status := 32;
status_text := 'Partitioning not found';
RETURN;
END IF;

SELECT WC.status, WC.status_text
FROM runs.write_checkpoint(
_fk_partitioning,
i_id_checkpoint,
i_checkpoint_name,
i_process_start_time,
i_process_end_time,
i_measurements,
i_measured_by_atum_agent,
i_by_user
) WC
INTO status, status_text;

status := result.status;
status_text := result.status_text;
RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.write_checkpoint_v1(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.write_checkpoint_v1(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user;
ALTER FUNCTION runs.write_checkpoint(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.write_checkpoint(JSONB, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user;
Original file line number Diff line number Diff line change
Expand Up @@ -42,42 +42,6 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
|""".stripMargin
)

private val measurements =
"""
|{
| "{
| \"measure\": {
| \"measureName\": \"count\",
| \"measuredColumns\": []
| },
| \"result\":{
| \"value\":\"3\",
| \"type\":\"int\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"col1\"]
| },
| \"result\":{
| \"value\":\"3.14\",
| \"type\":\"double\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"a\",\"b\"]
| },
| \"result\":{
| \"value\":\"2.71\",
| \"type\":\"double\"
| }
| }"
|}
|""".stripMargin

test("Write new checkpoint without data") {
val uuid = UUID.randomUUID
val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z")
Expand All @@ -101,7 +65,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0)

function(fnWriteCheckpoint)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Empty path")
.setParam("i_process_start_time", startTime)
Expand Down Expand Up @@ -136,7 +100,41 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
val user = "Franz Kafka"
val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z")
val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z")

val measurements =
"""
|{
| "{
| \"measure\": {
| \"measureName\": \"count\",
| \"measuredColumns\": []
| },
| \"result\":{
| \"value\":\"3\",
| \"type\":\"int\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"col1\"]
| },
| \"result\":{
| \"value\":\"3.14\",
| \"type\":\"double\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"a\",\"b\"]
| },
| \"result\":{
| \"value\":\"2.71\",
| \"type\":\"double\"
| }
| }"
|}
|""".stripMargin

table("runs.partitionings").insert(
add("partitioning", partitioning)
Expand All @@ -155,7 +153,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0)

function(fnWriteCheckpoint)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Happy path")
.setParam("i_process_start_time", startTime)
Expand Down Expand Up @@ -242,7 +240,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
)

function(fnWriteCheckpoint)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Won't go in")
.setParam("i_process_start_time", now())
Expand All @@ -269,7 +267,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
val uuid = UUID.randomUUID
val count = table("runs.checkpoints").count()
function(fnWriteCheckpoint)
.setParam("i_partitioning_id", 0L)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Won't go in")
.setParam("i_process_start_time", now())
Expand All @@ -285,5 +283,4 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
}
assert(table("runs.checkpoints").count() == count)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import za.co.absa.balta.classes.setter.CustomDBType
import java.time.OffsetDateTime
import java.util.UUID

class WriteCheckpointV1IntegrationTests extends DBTestSuite {
class WriteCheckpointOverloadedIntegrationTests extends DBTestSuite {

private val fnWriteCheckpointV1 = "runs.write_checkpoint_v1"
private val fnWriteCheckpoint = "runs.write_checkpoint"

private val partitioning = JsonBString(
"""
Expand All @@ -42,6 +42,42 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {
|""".stripMargin
)

private val measurements =
"""
|{
| "{
| \"measure\": {
| \"measureName\": \"count\",
| \"measuredColumns\": []
| },
| \"result\":{
| \"value\":\"3\",
| \"type\":\"int\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"col1\"]
| },
| \"result\":{
| \"value\":\"3.14\",
| \"type\":\"double\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"a\",\"b\"]
| },
| \"result\":{
| \"value\":\"2.71\",
| \"type\":\"double\"
| }
| }"
|}
|""".stripMargin

test("Write new checkpoint without data") {
val uuid = UUID.randomUUID
val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z")
Expand All @@ -64,8 +100,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {

assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0)

function(fnWriteCheckpointV1)
.setParam("i_partitioning", partitioning)
function(fnWriteCheckpoint)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Empty path")
.setParam("i_process_start_time", startTime)
Expand Down Expand Up @@ -100,41 +136,7 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {
val user = "Franz Kafka"
val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z")
val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z")
val measurements =
"""
|{
| "{
| \"measure\": {
| \"measureName\": \"count\",
| \"measuredColumns\": []
| },
| \"result\":{
| \"value\":\"3\",
| \"type\":\"int\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"col1\"]
| },
| \"result\":{
| \"value\":\"3.14\",
| \"type\":\"double\"
| }
| }",
| "{
| \"measure\": {
| \"measureName\": \"avg\",
| \"measuredColumns\": [\"a\",\"b\"]
| },
| \"result\":{
| \"value\":\"2.71\",
| \"type\":\"double\"
| }
| }"
|}
|""".stripMargin


table("runs.partitionings").insert(
add("partitioning", partitioning)
Expand All @@ -152,8 +154,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {

assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0)

function(fnWriteCheckpointV1)
.setParam("i_partitioning", partitioning)
function(fnWriteCheckpoint)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Happy path")
.setParam("i_process_start_time", startTime)
Expand Down Expand Up @@ -239,8 +241,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {
.add("created_by", origAuthor)
)

function(fnWriteCheckpointV1)
.setParam("i_partitioning", partitioning)
function(fnWriteCheckpoint)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Won't go in")
.setParam("i_process_start_time", now())
Expand All @@ -266,8 +268,8 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {
test("Partitioning of the checkpoint does not exist") {
val uuid = UUID.randomUUID
val count = table("runs.checkpoints").count()
function(fnWriteCheckpointV1)
.setParam("i_partitioning", partitioning)
function(fnWriteCheckpoint)
.setParam("i_partitioning_id", 0L)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Won't go in")
.setParam("i_process_start_time", now())
Expand All @@ -283,4 +285,5 @@ class WriteCheckpointV1IntegrationTests extends DBTestSuite {
}
assert(table("runs.checkpoints").count() == count)
}

}
2 changes: 1 addition & 1 deletion server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ object Main extends ZIOAppDefault with Server {
GetPartitioningAdditionalData.layer,
CreateOrUpdateAdditionalData.layer,
GetPartitioningCheckpoints.layer,
WriteCheckpointV1.layer,
WriteCheckpoint.layer,
WriteCheckpointV2.layer,
GetPartitioningCheckpointV2.layer,
GetFlowCheckpoints.layer,
PostgresDatabaseProvider.layer,
Expand Down
Loading

0 comments on commit ab7607e

Please sign in to comment.