Skip to content

Commit

Permalink
Feature/224 post partitioning (#258)
Browse files Browse the repository at this point in the history
post partitioning V2
  • Loading branch information
salamonpavel authored Sep 17, 2024
1 parent 18f2285 commit 3398044
Show file tree
Hide file tree
Showing 20 changed files with 637 additions and 60 deletions.
93 changes: 93 additions & 0 deletions database/src/main/postgres/runs/V1.9.7__create_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.create_partitioning(
IN i_partitioning JSONB,
IN i_by_user TEXT,
IN i_parent_partitioning_id BIGINT = NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_partitioning BIGINT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.create_partitioning(3)
-- Creates a partitioning entry
--
-- Parameters:
-- i_partitioning - partitioning to create or which existence to check
-- i_by_user - user behind the change
-- i_parent_partitioning_id - (optional) parent partitioning id
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_partitioning - id of the partitioning
--
-- Status codes:
-- 11 - Partitioning created
-- 12 - Partitioning created with parent partitioning
-- 31 - Partitioning already exists
-- 41 - Parent partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
id_partitioning := runs._get_id_partitioning(i_partitioning, true);

IF id_partitioning IS NOT NULL THEN
status := 31;
status_text := 'Partitioning already exists';
RETURN;
END IF;

IF i_parent_partitioning_id IS NOT NULL THEN
PERFORM 1 FROM runs.partitionings P WHERE P.id_partitioning = i_parent_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Parent partitioning not found';
RETURN;
END IF;
END IF;

INSERT INTO runs.partitionings (partitioning, created_by)
VALUES (i_partitioning, i_by_user)
RETURNING partitionings.id_partitioning INTO create_partitioning.id_partitioning;

PERFORM 1 FROM flows._create_flow(id_partitioning, i_by_user);
status := 11;
status_text := 'Partitioning created';

IF i_parent_partitioning_id IS NOT NULL THEN
PERFORM 1 FROM flows._add_to_parent_flows(i_parent_partitioning_id, id_partitioning, i_by_user);

-- copying measure definitions to establish continuity
INSERT INTO runs.measure_definitions(fk_partitioning, measure_name, measured_columns, created_by, created_at)
SELECT id_partitioning, CMD.measure_name, CMD.measured_columns, CMD.created_by, CMD.created_at
FROM runs.measure_definitions CMD
WHERE CMD.fk_partitioning = i_parent_partitioning_id;

status := 12;
status_text := 'Partitioning created with parent partitioning';
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.create_partitioning(JSONB, TEXT, BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.create_partitioning(JSONB, TEXT, BIGINT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class CreatePartitioningIntegrationTests extends DBTestSuite{

private val fncCreatePartitioning = "runs.create_partitioning"

private val partitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

private val parentPartitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3"],
| "keysToValues": {
| "key1": "valueX",
| "key3": "valueZ"
| }
|}
|""".stripMargin
)

test("Partitioning created") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

table("runs.partitionings").where(add("id_partitioning", partitioningID)) {partitioningResult =>
val row = partitioningResult.next()
assert(row.getString("created_by").contains("Fantômas"))
assert(row.getOffsetDateTime("created_at").contains(now()))
}

val idFlow = table("flows.partitioning_to_flow").where(add("fk_partitioning", partitioningID)) { partToFlowResult =>
assert(partToFlowResult.hasNext)
val partToFlowRow = partToFlowResult.next()
val result = partToFlowRow.getLong("fk_flow")
assert(partToFlowRow.getString("created_by").contains("Fantômas"))
assert(!partToFlowResult.hasNext)
result.get
}

table("flows.flows").where(add("id_flow", idFlow)) {flowsResult =>
assert(flowsResult.hasNext)
val flowRow = flowsResult.next()
assert(flowRow.getString("flow_name").exists(_.startsWith("Custom flow #")))
assert(flowRow.getString("flow_description").contains(""))
assert(flowRow.getBoolean("from_pattern").contains(false))
assert(flowRow.getString("created_by").contains("Fantômas"))
assert(flowRow.getOffsetDateTime("created_at").contains(now()))
assert(!flowsResult.hasNext)
}
}

test("Partitioning created with parent partitioning that already exists") {
val parentPartitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", parentPartitioning)
.setParam("i_by_user", "Albert Einstein")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning_id", parentPartitioningID)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(12))
assert(row.getString("status_text").contains("Partitioning created with parent partitioning"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2
)
}

test("Partitioning already exists") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already exists"))
assert(row.getLong("id_partitioning").contains(partitioningID))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}

test("Partitioning exists, parent is not added") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning_id", 123456789L)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already exists"))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.dto

import io.circe.generic.semiauto._
import io.circe._

case class PartitioningSubmitV2DTO(
partitioning: PartitioningDTO,
parentPartitioningId: Option[Long],
author: String
)

object PartitioningSubmitV2DTO {
implicit val decodePartitioningSubmitV2DTO: Decoder[PartitioningSubmitV2DTO] = deriveDecoder
implicit val encodePartitioningSubmitV2DTO: Encoder[PartitioningSubmitV2DTO] = deriveEncoder
}
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object Main extends ZIOAppDefault with Server {
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
CreatePartitioning.layer,
GetPartitioningMeasures.layer,
GetPartitioningMeasuresById.layer,
GetPartitioningAdditionalData.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ trait PartitioningController {
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, AtumContextDTO]

def createPartitioningIfNotExistsV2(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]]
def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitV2DTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)]

def getPartitioningAdditionalDataV2(
partitioningId: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
Expand Down Expand Up @@ -49,12 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
atumContextDTOEffect
}

override def createPartitioningIfNotExistsV2(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]] = {
mapToSingleSuccessResponse(createPartitioningIfNotExistsV1(partitioningSubmitDTO))
}

override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down Expand Up @@ -84,6 +79,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitV2DTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)] = {
for {
response <- mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.createPartitioning(partitioningSubmitDTO)
)
)
uri <- createV2RootAnchoredResourcePath(
Seq(V2Paths.Partitionings, response.data.id.toString)
)
} yield (response, uri)
}

override def patchPartitioningAdditionalDataV2(
partitioningId: Long,
additionalDataPatchDTO: AdditionalDataPatchDTO
Expand Down
Loading

0 comments on commit 3398044

Please sign in to comment.