Skip to content

Commit

Permalink
Feature/232 post checkpoint (#239)
Browse files Browse the repository at this point in the history
postCheckpointEndpointV2
  • Loading branch information
salamonpavel authored Aug 21, 2024
1 parent 202dab2 commit 0037625
Show file tree
Hide file tree
Showing 48 changed files with 1,009 additions and 283 deletions.
49 changes: 15 additions & 34 deletions database/src/main/postgres/runs/V1.5.10__write_checkpoint.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,54 +57,35 @@ $$
--
-- Status codes:
-- 11 - Checkpoint created
-- 14 - Checkpoint already present
-- 41 - Partitioning not found
-- 31 - Checkpoint already present
-- 32 - Partitioning not found
--
-------------------------------------------------------------------------------
DECLARE
_fk_partitioning BIGINT;
BEGIN

PERFORM 1
FROM runs.checkpoints CP
WHERE CP.id_checkpoint = i_id_checkpoint;

IF found THEN
status := 14;
status_text := 'Checkpoint already present';
RETURN;
END IF;

_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
status := 41;
status := 32;
status_text := 'Partitioning not found';
RETURN;
END IF;

INSERT INTO runs.checkpoints (id_checkpoint, fk_partitioning,
checkpoint_name, measured_by_atum_agent,
process_start_time, process_end_time, created_by)
VALUES (i_id_checkpoint, _fk_partitioning,
i_checkpoint_name, i_measured_by_atum_agent,
i_process_start_time, i_process_end_time, i_by_user);

-- maybe could use `jsonb_populate_record` function to be little bit more effective
PERFORM runs._write_measurement(
i_id_checkpoint,
_fk_partitioning,
UN.measurement->'measure'->>'measureName',
jsonb_array_to_text_array(UN.measurement->'measure'->'measuredColumns'),
UN.measurement->'result',
i_by_user
)
FROM (
SELECT unnest(i_measurements) AS measurement
) UN;
SELECT WC.status, WC.status_text
FROM runs.write_checkpoint(
_fk_partitioning,
i_id_checkpoint,
i_checkpoint_name,
i_process_start_time,
i_process_end_time,
i_measurements,
i_measured_by_atum_agent,
i_by_user
) WC
INTO status, status_text;

status := 11;
status_text := 'Checkpoint created';
RETURN;
END;
$$
Expand Down
109 changes: 109 additions & 0 deletions database/src/main/postgres/runs/V1.9.2__write_checkpoint.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.write_checkpoint(
IN i_partitioning_id BIGINT,
IN i_id_checkpoint UUID,
IN i_checkpoint_name TEXT,
IN i_process_start_time TIMESTAMP WITH TIME ZONE,
IN i_process_end_time TIMESTAMP WITH TIME ZONE,
IN i_measurements JSONB[],
IN i_measured_by_atum_agent BOOLEAN,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.write_checkpoint(8)
-- Creates a checkpoint and adds all the measurements that it consists of
--
-- Parameters:
-- i_partitioning_id - ID of the partitioning the measure belongs to
-- i_id_checkpoint - reference to the checkpoint this measure belongs into
-- i_checkpoint_name - name of the checkpoint
-- i_process_start_time - the start of processing (measuring) of the checkpoint
-- i_process_end_time - the end of the processing (measuring) of the checkpoint
-- i_measurements - array of JSON objects of the following format (values of the keys are examples only)
-- {
-- "measure": {
-- "measureName": "count",
-- "measuredColumns": ["a","b"]
-- },
-- "result": {
-- whatever here
-- }
-- }
-- i_measured_by_atum_agent - flag it the checkpoint was measured by Atum or data provided by user
-- i_by_user - user behind the change
--
-- Returns:
-- status - Status code
-- status_text - Status text
--
-- Status codes:
-- 11 - Checkpoint created
-- 31 - Conflict, checkpoint already present
-- 32 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
-- Check if partitioning exists
IF NOT EXISTS (SELECT 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id) THEN
status := 32;
status_text := 'Partitioning not found';
RETURN;
END IF;

PERFORM 1
FROM runs.checkpoints CP
WHERE CP.id_checkpoint = i_id_checkpoint;

IF found THEN
status := 31;
status_text := 'Checkpoint already present';
RETURN;
END IF;

INSERT INTO runs.checkpoints (id_checkpoint, fk_partitioning,
checkpoint_name, measured_by_atum_agent,
process_start_time, process_end_time, created_by)
VALUES (i_id_checkpoint, i_partitioning_id,
i_checkpoint_name, i_measured_by_atum_agent,
i_process_start_time, i_process_end_time, i_by_user);

-- maybe could use `jsonb_populate_record` function to be little bit more effective
PERFORM runs._write_measurement(
i_id_checkpoint,
i_partitioning_id,
UN.measurement->'measure'->>'measureName',
jsonb_array_to_text_array(UN.measurement->'measure'->'measuredColumns'),
UN.measurement->'result',
i_by_user
)
FROM (
SELECT unnest(i_measurements) AS measurement
) UN;

status := 11;
status_text := 'Checkpoint created';
RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.write_checkpoint(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.write_checkpoint(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user;
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.util.UUID

class WriteCheckpointIntegrationTests extends DBTestSuite {

private val fnWriteCheckpoint = "runs.write_checkpoint"

private val partitioning = JsonBString(
"""
|{
Expand Down Expand Up @@ -62,7 +64,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {

assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0)

function("runs.write_checkpoint")
function(fnWriteCheckpoint)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Empty path")
Expand Down Expand Up @@ -150,7 +152,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {

assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0)

function("runs.write_checkpoint")
function(fnWriteCheckpoint)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Happy path")
Expand Down Expand Up @@ -237,7 +239,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
.add("created_by", origAuthor)
)

function("runs.write_checkpoint")
function(fnWriteCheckpoint)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Won't go in")
Expand All @@ -249,7 +251,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(14))
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Checkpoint already present"))
}

Expand All @@ -264,7 +266,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
test("Partitioning of the checkpoint does not exist") {
val uuid = UUID.randomUUID
val count = table("runs.checkpoints").count()
function("runs.write_checkpoint")
function(fnWriteCheckpoint)
.setParam("i_partitioning", partitioning)
.setParam("i_id_checkpoint", uuid)
.setParam("i_checkpoint_name", "Won't go in")
Expand All @@ -276,7 +278,7 @@ class WriteCheckpointIntegrationTests extends DBTestSuite {
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(41))
assert(row.getInt("status").contains(32))
assert(row.getString("status_text").contains("Partitioning not found"))
}
assert(table("runs.checkpoints").count() == count)
Expand Down
Loading

0 comments on commit 0037625

Please sign in to comment.