Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get checkpoint #241

Merged
merged 58 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
682354e
postCheckpointEndpointV2
salamonpavel Aug 5, 2024
b72ebfa
postCheckpointV2
salamonpavel Aug 6, 2024
7b7819c
write_checkpoint_v2
salamonpavel Aug 6, 2024
b4604c4
fix existing tests
salamonpavel Aug 6, 2024
4b89a3a
fixes related to error model
salamonpavel Aug 6, 2024
47dea31
fixes related to error model
salamonpavel Aug 6, 2024
cc4af62
param name fix
salamonpavel Aug 6, 2024
bb6331e
exceptions imports
salamonpavel Aug 6, 2024
19e6382
exceptions imports
salamonpavel Aug 6, 2024
a3ffb44
PostCheckpointEndpointV2UnitTests
salamonpavel Aug 7, 2024
928823d
tests
salamonpavel Aug 7, 2024
bf47223
tests
salamonpavel Aug 7, 2024
c4245eb
remove unused import
salamonpavel Aug 7, 2024
5133272
get checkpoint
salamonpavel Aug 8, 2024
b4ef957
checkpointv2dto
salamonpavel Aug 8, 2024
69169ba
checkpointv2dto
salamonpavel Aug 8, 2024
519790a
checkpointv2dto
salamonpavel Aug 8, 2024
6e3588b
fix
salamonpavel Aug 8, 2024
61d587b
fix
salamonpavel Aug 8, 2024
a46e3aa
fix
salamonpavel Aug 8, 2024
31652e5
fix
salamonpavel Aug 8, 2024
75b83b1
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
salamonpavel Aug 8, 2024
c45b35d
fix
salamonpavel Aug 8, 2024
63df9ec
fix
salamonpavel Aug 9, 2024
900bdc2
fix
salamonpavel Aug 9, 2024
3704e4c
remove toCheckpointV2DTO method
salamonpavel Aug 9, 2024
6bd0a8c
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
salamonpavel Aug 9, 2024
b6bbfa4
GetPartitioningCheckpointV2EndpointUnitTests
salamonpavel Aug 9, 2024
cccb0c1
tests
salamonpavel Aug 9, 2024
44b9f36
tests
salamonpavel Aug 12, 2024
68ee4b2
tests
salamonpavel Aug 12, 2024
f012746
comments addressed
salamonpavel Aug 13, 2024
c361d18
formatting
salamonpavel Aug 13, 2024
451a56a
formatting
salamonpavel Aug 13, 2024
4b40110
refactoring as per discussion
salamonpavel Aug 13, 2024
2a62b39
refactoring as per discussion
salamonpavel Aug 13, 2024
896c085
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
salamonpavel Aug 13, 2024
bbe5efa
refactoring as per discussion
salamonpavel Aug 13, 2024
b8363a9
refactoring as per discussion
salamonpavel Aug 13, 2024
e99d863
refactoring as per discussion
salamonpavel Aug 13, 2024
f65b7c5
fixes
salamonpavel Aug 13, 2024
c84847b
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
salamonpavel Aug 13, 2024
4ad5505
docs for sql updated
salamonpavel Aug 14, 2024
03bdcbc
Merge branch 'feature/232-post-checkpoint' into feature/231-get-parti…
salamonpavel Aug 14, 2024
f76e6f8
extend Exception instead of Throwable
salamonpavel Aug 15, 2024
73af31a
Update database/src/main/postgres/runs/V1.9.2__write_checkpoint_v2.sql
salamonpavel Aug 15, 2024
4a8eca2
github comments addressed
salamonpavel Aug 19, 2024
2418cfd
v1 endpoint changed
salamonpavel Aug 19, 2024
ac41084
v1 tests
salamonpavel Aug 19, 2024
aec2ec1
v1 tests
salamonpavel Aug 19, 2024
6574bf1
v1 tests
salamonpavel Aug 19, 2024
d8acd15
rename
salamonpavel Aug 19, 2024
6c3bbb3
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
salamonpavel Aug 19, 2024
a6ada8d
merge conflicts resolved
salamonpavel Aug 19, 2024
867cf40
overloaded sql code
salamonpavel Aug 20, 2024
ab7607e
Merge branch 'refs/heads/feature/232-post-checkpoint' into feature/23…
salamonpavel Aug 20, 2024
5c21538
conflicts resolved
salamonpavel Aug 20, 2024
5336dcd
Merge branch 'refs/heads/master' into feature/231-get-partitioning-ch…
salamonpavel Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions database/src/main/postgres/runs/V1.9.2__write_checkpoint_v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.write_checkpoint_v2(
IN i_partitioning_id BIGINT,
IN i_id_checkpoint UUID,
IN i_checkpoint_name TEXT,
IN i_process_start_time TIMESTAMP WITH TIME ZONE,
IN i_process_end_time TIMESTAMP WITH TIME ZONE,
IN i_measurements JSONB[],
IN i_measured_by_atum_agent BOOLEAN,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.write_checkpoint_v2(10)
-- Creates a checkpoint and adds all the measurements that it consists of
--
-- Parameters:
-- i_partitioning_id - ID of the partitioning the measure belongs to
-- i_id_checkpoint - reference to the checkpoint this measure belongs into
-- i_checkpoint_name - name of the checkpoint
-- i_process_start_time - the start of processing (measuring) of the checkpoint
-- i_process_end_time - the end of the processing (measuring) of the checkpoint
-- i_measurements - array of JSON objects of the following format (values of the keys are examples only)
-- {
-- "measure": {
-- "measureName": "count",
-- "measuredColumns": ["a","b"]
-- },
-- "result": {
-- whatever here
-- }
-- }
-- i_measured_by_atum_agent - flag it the checkpoint was measured by Atum or data provided by user
-- i_by_user - user behind the change
--
-- Returns:
-- status - Status code
-- status_text - Status text
--
-- Status codes:
-- 11 - Checkpoint created
-- 31 - Conflict, checkpoint already present
-- 32 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
-- Check if partitioning exists
IF NOT EXISTS (SELECT 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id) THEN
status := 32;
status_text := 'Partitioning not found';
RETURN;
END IF;

PERFORM 1
FROM runs.checkpoints CP
WHERE CP.id_checkpoint = i_id_checkpoint;

IF found THEN
status := 31;
status_text := 'Checkpoint already present';
RETURN;
END IF;

INSERT INTO runs.checkpoints (id_checkpoint, fk_partitioning,
checkpoint_name, measured_by_atum_agent,
process_start_time, process_end_time, created_by)
VALUES (i_id_checkpoint, i_partitioning_id,
i_checkpoint_name, i_measured_by_atum_agent,
i_process_start_time, i_process_end_time, i_by_user);

-- maybe could use `jsonb_populate_record` function to be little bit more effective
PERFORM runs._write_measurement(
i_id_checkpoint,
i_partitioning_id,
UN.measurement->'measure'->>'measureName',
jsonb_array_to_text_array(UN.measurement->'measure'->'measuredColumns'),
UN.measurement->'result',
i_by_user
)
FROM (
SELECT unnest(i_measurements) AS measurement
) UN;

status := 11;
status_text := 'Checkpoint created';
RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.write_checkpoint_v2(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.write_checkpoint_v2(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoint_v2(
IN i_partitioning_id BIGINT,
IN i_checkpoint_id UUID,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
)
RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_checkpoint_v2(BIGINT, UUID)
-- Retrieves a single checkpoint (measures and their measurement details) related to a
-- given partitioning and checkpoint ID.
--
-- Parameters:
-- i_partitioning_id - ID of the partitioning
-- i_checkpoint_id - ID of the checkpoint
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- author - Author of the checkpoint
-- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
-- 42 - Checkpoint not found
--
-------------------------------------------------------------------------------
BEGIN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

RETURN QUERY
SELECT
11 AS status,
'Ok' AS status_text,
C.id_checkpoint,
C.checkpoint_name,
C.created_by AS author,
C.measured_by_atum_agent,
md.measure_name,
md.measured_columns,
M.measurement_value,
C.process_start_time AS checkpoint_start_time,
C.process_end_time AS checkpoint_end_time
FROM
runs.checkpoints C
JOIN
runs.measurements M ON C.id_checkpoint = M.fk_checkpoint
JOIN
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
WHERE
C.fk_partitioning = i_partitioning_id
AND
C.id_checkpoint = i_checkpoint_id;

IF NOT FOUND THEN
status := 42;
status_text := 'Checkpoint not found';
RETURN NEXT;
RETURN;
END IF;
END;
$$

LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_checkpoint_v2(BIGINT, UUID) OWNER TO atum_owner;

GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoint_v2(BIGINT, UUID) TO atum_owner;
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString
import za.co.absa.balta.classes.setter.CustomDBType

import java.time.OffsetDateTime
import java.util.UUID

class GetPartitioningCheckpointV2IntegrationTests extends DBTestSuite {

private val fncGetPartitioningCheckpointV2 = "runs.get_partitioning_checkpoint_v2"

case class MeasuredDetails(
measureName: String,
measureColumns: Seq[String],
measurementValue: JsonBString
)

private val partitioning1 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["keyX", "keyY", "keyZ"],
| "keysToValues": {
| "keyX": "value1",
| "keyZ": "value3",
| "keyY": "value2"
| }
|}
|""".stripMargin
)

private val partitioning2 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

private val measurement1 = JsonBString("""1""".stripMargin)

private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]")

test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") {

val uuid = UUID.randomUUID
val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z")
val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z")

val id_measure_definition: Long = 1

table("runs.partitionings").insert(
add("partitioning", partitioning1)
.add("created_by", "Daniel")
)

val fkPartitioning1: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning1, "id_partitioning")
.get
.get

table("runs.checkpoints").insert(
add("id_checkpoint", uuid)
.add("fk_partitioning", fkPartitioning1)
.add("checkpoint_name", "checkpoint_1")
.add("process_start_time", startTime)
.add("process_end_time", endTime)
.add("measured_by_atum_agent", true)
.add("created_by", "Daniel")
)

table("runs.measure_definitions").insert(
add("id_measure_definition", id_measure_definition)
.add("fk_partitioning", fkPartitioning1)
.add("created_by", "Daniel")
.add("measure_name", "measure_1")
.add("measured_columns", measured_columns)
)

table("runs.measurements").insert(
add("fk_checkpoint", uuid)
.add("fk_measure_definition", id_measure_definition)
.add("measurement_value", measurement1)
)

function(fncGetPartitioningCheckpointV2)
.setParam("i_partitioning_id", fkPartitioning1)
.setParam("i_checkpoint_id", uuid)
.execute { queryResult =>
assert(queryResult.hasNext)
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("Ok"))
assert(results.getString("checkpoint_name").contains("checkpoint_1"))
assert(results.getUUID("id_checkpoint").contains(uuid))
assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime))
assert(results.getJsonB("measurement_value").contains(measurement1))
assert(results.getString("measure_name").contains("measure_1"))
assert(!queryResult.hasNext)
}
}

test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") {

table("runs.partitionings").insert(
add("partitioning", partitioning2)
.add("created_by", "Daniel")
)

val fkPartitioning2: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning2, "id_partitioning")
.get
.get

function(fncGetPartitioningCheckpointV2)
.setParam("i_partitioning_id", fkPartitioning2)
.setParam("i_checkpoint_id", UUID.randomUUID())
.execute { queryResult =>
assert(queryResult.hasNext)
val results = queryResult.next()
assert(results.getInt("status").contains(42))
assert(results.getString("status_text").contains("Checkpoint not found"))
}

}

test("Get partitioning checkpoints no checkpoints non-existent partitionings") {

function(fncGetPartitioningCheckpointV2)
.setParam("i_partitioning_id", 0L)
.setParam("i_checkpoint_id", UUID.randomUUID())
.execute { queryResult =>
assert(queryResult.hasNext)
val results = queryResult.next()
assert(results.getInt("status").contains(41))
assert(results.getString("status_text").contains("Partitioning not found"))
}

}

}
Loading
Loading