-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/235 get flow partitionings #267
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
a1e822c
endpoint
salamonpavel bc18a03
Merge branch 'master' into feature/235-get-flow-partitionings
salamonpavel 649f969
get_flow_partitionings
salamonpavel 165c3dd
test unfinished
salamonpavel ad08738
test sql
salamonpavel 3fb80b6
test sql
salamonpavel b6e4038
test sql
salamonpavel dcdc93e
_add_to_parent_flows fix
salamonpavel bb2cbf9
tmp
salamonpavel 44ab165
implementation without tests
salamonpavel 6a2be40
GetFlowPartitioningsIntegrationTests
salamonpavel 10ff0b6
repository tests
salamonpavel aa5c39f
service and controller tests
salamonpavel d2b8e30
GetFlowPartitioningsV2EndpointUnitTests
salamonpavel 01fd90a
input validation, sql pagination changed
salamonpavel 194b08f
Merge branch 'master' into feature/235-get-flow-partitionings
salamonpavel 06b18de
pr comments addressed
salamonpavel bdcc00e
Merge branch 'master' into feature/235-get-flow-partitionings
salamonpavel baa526e
no status when no results
salamonpavel 819c05f
Update database/src/main/postgres/flows/V1.9.8__get_flow_partitioning…
salamonpavel f9f016c
Merge branch 'master' into feature/235-get-flow-partitionings
salamonpavel 56adaa1
conflicts resolved
salamonpavel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
97 changes: 97 additions & 0 deletions
97
database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( | ||
IN i_flow_id BIGINT, | ||
IN i_limit INT DEFAULT 5, | ||
IN i_offset BIGINT DEFAULT 0, | ||
OUT status INTEGER, | ||
OUT status_text TEXT, | ||
OUT id BIGINT, | ||
OUT partitioning JSONB, | ||
OUT author TEXT, | ||
OUT has_more BOOLEAN | ||
) RETURNS SETOF record AS | ||
------------------------------------------------------------------------------- | ||
-- | ||
-- Function: flows.get_flow_partitionings(3) | ||
-- Retrieves all partitionings associated with the input flow. | ||
-- | ||
-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table | ||
-- | ||
-- Parameters: | ||
-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved | ||
-- i_limit - (optional) maximum number of partitionings to return, default is 5 | ||
-- i_offset - (optional) offset to use for pagination, default is 0 | ||
-- | ||
-- Returns: | ||
-- status - Status code | ||
-- status_text - Status text | ||
-- id - ID of retrieved partitioning | ||
-- partitioning - Partitioning value | ||
-- author - Author of the partitioning | ||
-- has_more - Flag indicating if there are more partitionings available | ||
-- | ||
-- Status codes: | ||
-- 11 - OK | ||
-- 41 - Flow not found | ||
-- | ||
------------------------------------------------------------------------------- | ||
$$ | ||
DECLARE | ||
_has_more BOOLEAN; | ||
BEGIN | ||
PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id; | ||
IF NOT FOUND THEN | ||
status := 41; | ||
status_text := 'Flow not found'; | ||
RETURN NEXT; | ||
RETURN; | ||
END IF; | ||
|
||
IF i_limit IS NOT NULL THEN | ||
SELECT count(*) > i_limit | ||
FROM flows.partitioning_to_flow PTF | ||
WHERE PTF.fk_flow = i_flow_id | ||
LIMIT i_limit + 1 OFFSET i_offset | ||
INTO _has_more; | ||
ELSE | ||
_has_more := false; | ||
END IF; | ||
|
||
|
||
RETURN QUERY | ||
SELECT | ||
11 AS status, | ||
'OK' AS status_text, | ||
P.id_partitioning, | ||
P.partitioning, | ||
P.created_by, | ||
_has_more | ||
FROM | ||
runs.partitionings P INNER JOIN | ||
flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning | ||
WHERE | ||
PF.fk_flow = i_flow_id | ||
ORDER BY | ||
P.id_partitioning, | ||
P.created_at DESC | ||
LIMIT i_limit OFFSET i_offset; | ||
END; | ||
$$ | ||
LANGUAGE plpgsql VOLATILE SECURITY DEFINER; | ||
|
||
GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner; |
193 changes: 193 additions & 0 deletions
193
.../src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.database.flows | ||
|
||
import io.circe.Json | ||
import io.circe.parser.parse | ||
import za.co.absa.balta.DBTestSuite | ||
import za.co.absa.balta.classes.JsonBString | ||
|
||
class GetFlowPartitioningsIntegrationTests extends DBTestSuite { | ||
|
||
private val getFlowPartitioningsFn = "flows.get_flow_partitionings" | ||
private val createFlowFn = "flows._create_flow" | ||
private val addToParentFlowsFn = "flows._add_to_parent_flows" | ||
|
||
private val partitioningsTable = "runs.partitionings" | ||
|
||
private val partitioning1 = JsonBString( | ||
""" | ||
|{ | ||
| "version": 1, | ||
| "keys": ["keyA", "keyB", "keyC"], | ||
| "keysToValues": { | ||
| "keyA": "valueA", | ||
| "keyB": "valueB", | ||
| "keyC": "valueC" | ||
| } | ||
|} | ||
|""".stripMargin | ||
) | ||
|
||
private val partitioning1Parent = JsonBString( | ||
""" | ||
|{ | ||
| "version": 1, | ||
| "keys": ["keyA", "keyB"], | ||
| "keysToValues": { | ||
| "keyA": "valueA", | ||
| "keyB": "valueB" | ||
| } | ||
|} | ||
|""".stripMargin | ||
) | ||
|
||
private val partitioning2 = JsonBString( | ||
""" | ||
|{ | ||
| "version": 1, | ||
| "keys": ["keyD", "keyE", "keyF"], | ||
| "keysToValues": { | ||
| "keyD": "valueD", | ||
| "keyE": "valueE", | ||
| "keyF": "valueF" | ||
| } | ||
|} | ||
|""".stripMargin | ||
) | ||
|
||
var flowIdOfPartitioning1: Long = _ | ||
var flowIdOfParentPartitioning1: Long = _ | ||
var flowIdOfPartitioning2: Long = _ | ||
var flowIdOfPartitioning3: Long = _ | ||
|
||
test("Returns partitioning(s) for a given flow") { | ||
table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Joseph")) | ||
table(partitioningsTable).insert(add("partitioning", partitioning1Parent).add("created_by", "Joseph")) | ||
table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Joseph")) | ||
|
||
val partId1: Long = table(partitioningsTable) | ||
.fieldValue("partitioning", partitioning1, "id_partitioning").get.get | ||
|
||
val partId1Parent: Long = table(partitioningsTable) | ||
.fieldValue("partitioning", partitioning1Parent, "id_partitioning").get.get | ||
|
||
val partId2: Long = table(partitioningsTable) | ||
.fieldValue("partitioning", partitioning2, "id_partitioning").get.get | ||
|
||
function(createFlowFn) | ||
.setParam("i_fk_partitioning", partId1) | ||
.setParam("i_by_user", "Joseph") | ||
.execute { queryResult => | ||
flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get | ||
} | ||
|
||
function(createFlowFn) | ||
.setParam("i_fk_partitioning", partId1Parent) | ||
.setParam("i_by_user", "Joseph") | ||
.execute { queryResult => | ||
flowIdOfParentPartitioning1 = queryResult.next().getLong("id_flow").get | ||
} | ||
|
||
function(createFlowFn) | ||
.setParam("i_fk_partitioning", partId2) | ||
.setParam("i_by_user", "Joseph") | ||
.execute { queryResult => | ||
flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get | ||
} | ||
|
||
function(addToParentFlowsFn) | ||
.setParam("i_fk_parent_partitioning", partId1Parent) | ||
.setParam("i_fk_partitioning", partId1) | ||
.setParam("i_by_user", "Joseph") | ||
.execute { queryResult => | ||
val result1 = queryResult.next() | ||
assert(result1.getInt("status").get == 11) | ||
assert(result1.getString("status_text").get == "Partitioning added to flows") | ||
} | ||
|
||
function(getFlowPartitioningsFn) | ||
.setParam("i_flow_id", flowIdOfPartitioning1) | ||
.setParam("i_limit", 1) | ||
.execute { queryResult => | ||
val result1 = queryResult.next() | ||
assert(result1.getInt("status").get == 11) | ||
assert(result1.getString("status_text").get == "OK") | ||
assert(result1.getLong("id").get == partId1) | ||
val expectedPartitioningJson = parseJsonBStringOrThrow(partitioning1) | ||
val returnedPartitioningJson = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) | ||
assert(expectedPartitioningJson == returnedPartitioningJson) | ||
assert(!result1.getBoolean("has_more").get) | ||
assert(!queryResult.hasNext) | ||
} | ||
|
||
function(getFlowPartitioningsFn) | ||
.setParam("i_flow_id", flowIdOfParentPartitioning1) | ||
.setParam("i_limit", 1) // limit is set to 1, so only one partitioning should be returned and more data available | ||
.execute { queryResult => | ||
val result1 = queryResult.next() | ||
assert(result1.getInt("status").get == 11) | ||
assert(result1.getString("status_text").get == "OK") | ||
assert(result1.getLong("id").get == partId1) | ||
val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) | ||
val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) | ||
assert(expectedPartitioningJson1 == returnedPartitioningJson1) | ||
assert(result1.getBoolean("has_more").get) | ||
assert(!queryResult.hasNext) | ||
} | ||
|
||
function(getFlowPartitioningsFn) | ||
.setParam("i_flow_id", flowIdOfParentPartitioning1) | ||
.setParam("i_limit", 2) // limit is set to 2, so both partitionings should be returned and no more data available | ||
.execute { queryResult => | ||
val result1 = queryResult.next() | ||
assert(result1.getInt("status").get == 11) | ||
assert(result1.getString("status_text").get == "OK") | ||
assert(result1.getLong("id").get == partId1) | ||
val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) | ||
val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) | ||
assert(expectedPartitioningJson1 == returnedPartitioningJson1) | ||
assert(!result1.getBoolean("has_more").get) | ||
assert(queryResult.hasNext) | ||
assert(queryResult.hasNext) | ||
val result2 = queryResult.next() | ||
assert(result2.getLong("id").get == partId1Parent) | ||
val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning1Parent) | ||
val returnedPartitioningJson2 = parseJsonBStringOrThrow(result2.getJsonB("partitioning").get) | ||
assert(expectedPartitioningJson2 == returnedPartitioningJson2) | ||
assert(!result2.getBoolean("has_more").get) | ||
assert(!queryResult.hasNext) | ||
} | ||
} | ||
|
||
test("Fails for non-existent flow"){ | ||
function(getFlowPartitioningsFn) | ||
.setParam("i_flow_id", 999999) | ||
.setParam("i_limit", 1) | ||
.execute { queryResult => | ||
val result1 = queryResult.next() | ||
assert(result1.getInt("status").get == 41) | ||
assert(result1.getString("status_text").get == "Flow not found") | ||
assert(!queryResult.hasNext) | ||
} | ||
} | ||
|
||
private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = { | ||
parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json")) | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also want to care whether a given partitioning was
primary
for the given flow?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Little value now, and adds a table to the JOIN.
IMHO not worth it.
Btw, we could add the main flow id to partitioning table (for easier search and one less index).