Skip to content

Commit

Permalink
Feature/230 get partitioning checkpoints (#265)
Browse files Browse the repository at this point in the history
get partitioning checkpoints
  • Loading branch information
salamonpavel authored Sep 26, 2024
1 parent 2203397 commit 43ec036
Show file tree
Hide file tree
Showing 29 changed files with 680 additions and 336 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/jacoco_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Add coverage to PR (model)
if: steps.jacocorun.outcome == 'success'
id: jacoco-model
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -74,7 +74,7 @@ jobs:
- name: Add coverage to PR (agent)
if: steps.jacocorun.outcome == 'success'
id: jacoco-agent
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/agent/target/spark3-jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -85,7 +85,7 @@ jobs:
- name: Add coverage to PR (reader)
if: steps.jacocorun.outcome == 'success'
id: jacoco-reader
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/reader/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -96,7 +96,7 @@ jobs:
- name: Add coverage to PR (server)
if: steps.jacocorun.outcome == 'success'
id: jacoco-server
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/server/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* limitations under the License.
*/

-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
IN i_partitioning JSONB,
IN i_limit INT DEFAULT 5,
IN i_partitioning_id BIGINT,
IN i_checkpoints_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
Expand All @@ -28,86 +28,106 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE,
OUT has_more BOOLEAN
)
RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
RETURNS SETOF record AS
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
-- Retrieves all checkpoints (measures and their measurement details) related to a
-- Function: runs.get_partitioning_checkpoints(4)
-- Retrieves checkpoints (measures and their measurement details) related to a
-- given partitioning (and checkpoint name, if specified).
--
-- Parameters:
-- i_partitioning - partitioning of requested checkpoints
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoints_limit - (optional) maximum number of checkpoints to return
-- i_offset - (optional) offset of the first checkpoint to return
-- i_checkpoint_name - (optional) name of the checkpoint

-- Note: i_checkpoints_limit and i_offset are used for pagination purposes;
-- checkpoints are ordered by process_start_time in descending order
-- and then by id_checkpoint in ascending order
--
-- Returns:
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- author - Author of the checkpoint
-- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
-- has_more - Flag indicating whether there are more checkpoints available
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
$$
DECLARE
_fk_partitioning BIGINT;
_has_more BOOLEAN;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

IF i_checkpoints_limit IS NOT NULL THEN
SELECT count(*) > i_checkpoints_limit
FROM runs.checkpoints C
WHERE C.fk_partitioning = i_partitioning_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
LIMIT i_checkpoints_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;

RETURN QUERY
WITH limited_checkpoints AS (
SELECT C.id_checkpoint,
C.checkpoint_name,
C.created_by,
C.measured_by_atum_agent,
C.process_start_time,
C.process_end_time
FROM runs.checkpoints C
WHERE C.fk_partitioning = i_partitioning_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY C.id_checkpoint, C.process_start_time
LIMIT i_checkpoints_limit OFFSET i_offset
)
SELECT
11 AS status,
'Ok' AS status_text,
C.id_checkpoint,
C.checkpoint_name,
C.created_by AS author,
C.measured_by_atum_agent,
LC.id_checkpoint,
LC.checkpoint_name,
LC.created_by AS author,
LC.measured_by_atum_agent,
md.measure_name,
md.measured_columns,
M.measurement_value,
C.process_start_time AS checkpoint_start_time,
C.process_end_time AS checkpoint_end_time
LC.process_start_time AS checkpoint_start_time,
LC.process_end_time AS checkpoint_end_time,
_has_more AS has_more
FROM
runs.checkpoints C
JOIN
runs.measurements M ON C.id_checkpoint = M.fk_checkpoint
JOIN
limited_checkpoints LC
INNER JOIN
runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint
INNER JOIN
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
WHERE
C.fk_partitioning = _fk_partitioning
AND
(i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY
C.process_start_time,
C.id_checkpoint
LIMIT nullif(i_limit, 0);

LC.id_checkpoint, LC.process_start_time;
END;
$$

LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner;

GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner;

ALTER FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner;
Loading

0 comments on commit 43ec036

Please sign in to comment.