Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/224 post partitioning #258

Merged
merged 11 commits into from
Sep 17, 2024
107 changes: 107 additions & 0 deletions database/src/main/postgres/runs/V1.9.7__create_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.create_partitioning(
IN i_partitioning JSONB,
IN i_by_user TEXT,
IN i_parent_partitioning JSONB = NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_partitioning BIGINT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.create_partitioning(3)
-- Creates a partitioning entry
--
-- Parameters:
-- i_partitioning - partitioning to create or which existence to check
-- i_by_user - user behind the change
-- i_parent_partitioning - parent partitioning of the provided partitioning, optional
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_partitioning - id of the partitioning
--
-- Status codes:
-- 11 - Partitioning created
-- 12 - Partitioning parent registered
-- 31 - Partitioning already present
--
-------------------------------------------------------------------------------
DECLARE
_fk_parent_partitioning BIGINT := NULL;
_create_partitioning BOOLEAN;
_status BIGINT;
BEGIN

id_partitioning := runs._get_id_partitioning(i_partitioning, true);

_create_partitioning := id_partitioning IS NULL;

IF i_parent_partitioning IS NOT NULL THEN
SELECT CPINE.id_partitioning
FROM runs.create_partitioning_if_not_exists(i_parent_partitioning, i_by_user, NULL) AS CPINE
INTO _fk_parent_partitioning;
END IF;


IF _create_partitioning THEN
INSERT INTO runs.partitionings (partitioning, created_by)
VALUES (i_partitioning, i_by_user)
RETURNING partitionings.id_partitioning
INTO create_partitioning.id_partitioning;

PERFORM 1
FROM flows._create_flow(id_partitioning, i_by_user);

status := 11;
status_text := 'Partitioning created';
ELSE
status := 31;
status_text := 'Partitioning already present';
RETURN;
END IF;

IF i_parent_partitioning IS NOT NULL THEN

SELECT ATPF.status
FROM flows._add_to_parent_flows(_fk_parent_partitioning, id_partitioning, i_by_user) AS ATPF
INTO _status;

IF _create_partitioning THEN
-- copying measure definitions to establish continuity
INSERT INTO runs.measure_definitions(fk_partitioning, measure_name, measured_columns, created_by, created_at)
SELECT id_partitioning, CMD.measure_name, CMD.measured_columns, CMD.created_by, CMD.created_at
FROM runs.measure_definitions CMD
WHERE CMD.fk_partitioning = _fk_parent_partitioning;

-- additional data are not copied, they are specific for particular partitioning
ELSIF (_status = 11) THEN
status := 12;
status_text := 'Partitioning parent registered';
END IF;
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.create_partitioning(JSONB, TEXT, JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.create_partitioning(JSONB, TEXT, JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class CreatePartitioningIntegrationTests extends DBTestSuite{

private val fncCreatePartitioning = "runs.create_partitioning"

private val partitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

private val parentPartitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3"],
| "keysToValues": {
| "key1": "valueX",
| "key3": "valueZ"
| }
|}
|""".stripMargin
)

test("Partitioning created") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

table("runs.partitionings").where(add("id_partitioning", partitioningID)) {partitioningResult =>
val row = partitioningResult.next()
// assert(row.getJsonB("partitioning").contains(partitioning)) TODO keys are reordered in JsonB and whitespaces removed
assert(row.getString("created_by").contains("Fantômas"))
assert(row.getOffsetDateTime("created_at").contains(now()))
}

val idFlow = table("flows.partitioning_to_flow").where(add("fk_partitioning", partitioningID)) { partToFlowResult =>
assert(partToFlowResult.hasNext)
val partToFlowRow = partToFlowResult.next()
val result = partToFlowRow.getLong("fk_flow")
assert(partToFlowRow.getString("created_by").contains("Fantômas"))
assert(!partToFlowResult.hasNext)
result.get
}

table("flows.flows").where(add("id_flow", idFlow)) {flowsResult =>
assert(flowsResult.hasNext)
val flowRow = flowsResult.next()
assert(flowRow.getString("flow_name").exists(_.startsWith("Custom flow #")))
assert(flowRow.getString("flow_description").contains(""))
assert(flowRow.getBoolean("from_pattern").contains(false))
assert(flowRow.getString("created_by").contains("Fantômas"))
assert(flowRow.getOffsetDateTime("created_at").contains(now()))
assert(!flowsResult.hasNext)
}
}
test("Partitioning created with parent partitioning that already exists") {
val parentPartitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", parentPartitioning)
.setParam("i_by_user", "Albert Einstein")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning", parentPartitioning)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2
)
}

test("Partitioning already exists") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already present"))
assert(row.getLong("id_partitioning").contains(partitioningID))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}

test("Partitioning exists, parent is not added") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning", parentPartitioning)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already present"))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}
}
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object Main extends ZIOAppDefault with Server {
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
CreatePartitioning.layer,
GetPartitioningMeasures.layer,
GetPartitioningMeasuresById.layer,
GetPartitioningAdditionalData.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ trait PartitioningController {
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, AtumContextDTO]

def createPartitioningIfNotExistsV2(
def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]]
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)]

def getPartitioningAdditionalDataV2(
partitioningId: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
Expand Down Expand Up @@ -49,12 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
atumContextDTOEffect
}

override def createPartitioningIfNotExistsV2(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]] = {
mapToSingleSuccessResponse(createPartitioningIfNotExistsV1(partitioningSubmitDTO))
}

override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down Expand Up @@ -84,6 +79,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)] = {
for {
response <- mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.createPartitioning(partitioningSubmitDTO)
)
)
uri <- createV2RootAnchoredResourcePath(
Seq(V2Paths.Partitionings, response.data.id.toString)
)
} yield (response, uri)
}

override def patchPartitioningAdditionalDataV2(
partitioningId: Long,
additionalDataPatchDTO: AdditionalDataPatchDTO
Expand Down
Loading
Loading