diff --git a/snowflake/objects/database/recover/schema/parquet/deploy.sql b/snowflake/objects/database/recover/schema/parquet/deploy.sql index 0cac7590..0da976a1 100644 --- a/snowflake/objects/database/recover/schema/parquet/deploy.sql +++ b/snowflake/objects/database/recover/schema/parquet/deploy.sql @@ -1,11 +1,15 @@ /* Create a parquet schema (if it doesn't yet exist) and deploy all child objects. + + Jinja templating variables: + git_branch - The name of the git branch from which we are deploying. */ CREATE SCHEMA IF NOT EXISTS parquet; USE SCHEMA parquet; SET parquet_file_format_name = 'parquet_format'; -SET parquet_stage_name = 'parquet_s3'; +SET parquet_prod_stage_name = 'parquet_prod_s3'; +SET parquet_dev_stage_name = 'parquet_dev_s3'; EXECUTE IMMEDIATE FROM './file_format/deploy.sql' @@ -16,7 +20,29 @@ EXECUTE IMMEDIATE FROM './stage/deploy.sql' USING ( git_branch => '{{ git_branch }}', - parquet_stage_name => $parquet_stage_name + parquet_prod_stage_name => $parquet_prod_stage_name, + parquet_dev_stage_name => $parquet_dev_stage_name ); EXECUTE IMMEDIATE FROM './table/deploy.sql'; +EXECUTE IMMEDIATE +$$ +BEGIN + IF ('{{ git_branch }}' = 'main') THEN + -- Our procedures will reference the prod stage + EXECUTE IMMEDIATE + FROM './procedure/deploy.sql' + USING ( + stage_name => $parquet_prod_stage_name, + file_format => $parquet_file_format_name + ); + ELSE + EXECUTE IMMEDIATE + FROM './procedure/deploy.sql' + USING ( + stage_name => $parquet_dev_stage_name, + file_format => $parquet_file_format_name + ); + END IF; +END; +$$; diff --git a/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql b/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql new file mode 100644 index 00000000..062a0690 --- /dev/null +++ b/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql @@ -0,0 +1,35 @@ +/* + A stored procedure which copies Parquet data from a named stage into a table. + + Because of limitations in how we can pass variables to stage names, + this procedure is specific to a stage location. That is, we cannot + use Snowflake scripting variables within the stage name, so we instead + use Jinja variables, which has the side effect of "fixing" the procedure + to use a specific stage location. + + Jinja templating variables: + datatype - The datatype which our stage location refers to. + stage_name - The name of the stage where our data exists. + stage_path - The location within the stage where our data exists. + file_format - The name of the file format object used during copy. + */ +CREATE OR REPLACE PROCEDURE copy_into_table_from_{{ datatype }}_parquet_stage( + target_table VARCHAR +) + RETURNS TABLE () + LANGUAGE SQL +as +$$ +DECLARE + res RESULTSET DEFAULT ( + COPY INTO IDENTIFIER(:target_table) + FROM @{{ stage_name }}/{{ stage_path }} + FILE_FORMAT = ( + FORMAT_NAME = '{{ file_format }}' + ) + MATCH_BY_COLUMN_NAME = CASE_SENSITIVE + ); +BEGIN + RETURN TABLE(res); +END; +$$; diff --git a/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql b/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql new file mode 100644 index 00000000..665ff158 --- /dev/null +++ b/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql @@ -0,0 +1,61 @@ +/* + Deploy all PROCEDURE objects + + Jinja templating variables: + stage_name - The name of the stage where our data exists. + file_format - The name of the file format object used by the + `copy_into_table_from_stage.sql` procedure. +*/ + +WITH create_procedure_for_each_parquet_table AS PROCEDURE () + RETURNS VARCHAR + LANGUAGE SQL +AS +$$ +DECLARE + parquet_datatypes ARRAY := [ + 'enrolledparticipants_customfields_symptoms', + 'enrolledparticipants_customfields_treatments', + 'enrolledparticipants', + 'fitbitactivitylogs', + 'fitbitdailydata', + 'fitbitdevices', + 'fitbitecg', + 'fitbitecg_waveformsamples', + 'fitbitintradaycombined', + 'fitbitrestingheartrates', + 'fitbitsleeplogs', + 'fitbitsleeplogs_sleeplogdetails', + 'googlefitsamples', + 'healthkitv2activitysummaries', + 'healthkitv2electrocardiogram', + 'healthkitv2electrocardiogram_subsamples', + 'healthkitv2heartbeat', + 'healthkitv2heartbeat_subsamples', + 'healthkitv2samples', + 'healthkitv2statistics', + 'healthkitv2workouts_events', + 'healthkitv2workouts', + 'symptomlog', + 'symptomlog_value_symptoms', + 'symptomlog_value_treatments' + ]; + datatype VARCHAR; + dataset_name VARCHAR; +BEGIN + FOR i in 0 to array_size(:parquet_datatypes)-1 DO + datatype := GET(:parquet_datatypes, :i); + dataset_name := CONCAT('dataset_', :datatype); + -- Create a stored procedure which uses this data type's stage location + EXECUTE IMMEDIATE + FROM './copy_into_table_from_stage.sql' + USING ( + datatype => :datatype, + stage_name => '{{ stage_name }}', + stage_path => :dataset_name, + file_format => '{{ file_format }}' + ); + END FOR; +END; +$$ +CALL create_procedure_for_each_parquet_table(); diff --git a/snowflake/objects/database/recover/schema/parquet/stage/deploy.sql b/snowflake/objects/database/recover/schema/parquet/stage/deploy.sql index ad267807..8c5e3b2a 100644 --- a/snowflake/objects/database/recover/schema/parquet/stage/deploy.sql +++ b/snowflake/objects/database/recover/schema/parquet/stage/deploy.sql @@ -2,8 +2,14 @@ Deploy all stages under the `parquet` schema. */ EXECUTE IMMEDIATE - FROM './parquet_s3.sql' + FROM './parquet_prod_s3.sql' USING ( git_branch => '{{ git_branch }}', - parquet_stage_name => '{{ parquet_stage_name }}' + parquet_stage_name => '{{ parquet_prod_stage_name }}' + ); +EXECUTE IMMEDIATE + FROM './parquet_dev_s3.sql' + USING ( + git_branch => '{{ git_branch }}', + parquet_stage_name => '{{ parquet_dev_stage_name }}' ); diff --git a/snowflake/objects/database/recover/schema/parquet/stage/parquet_dev_s3.sql b/snowflake/objects/database/recover/schema/parquet/stage/parquet_dev_s3.sql new file mode 100644 index 00000000..aa3b8124 --- /dev/null +++ b/snowflake/objects/database/recover/schema/parquet/stage/parquet_dev_s3.sql @@ -0,0 +1,6 @@ +/* + Create an external stage over the dev Parquet data in S3 +*/ +CREATE OR REPLACE STAGE {{ parquet_stage_name }} + URL = 's3://recover-dev-processed-data/{{ git_branch }}/parquet/' + STORAGE_INTEGRATION = recover_dev_s3; diff --git a/snowflake/objects/database/recover/schema/parquet/stage/parquet_s3.sql b/snowflake/objects/database/recover/schema/parquet/stage/parquet_prod_s3.sql similarity index 70% rename from snowflake/objects/database/recover/schema/parquet/stage/parquet_s3.sql rename to snowflake/objects/database/recover/schema/parquet/stage/parquet_prod_s3.sql index 943ee5da..4184b21c 100644 --- a/snowflake/objects/database/recover/schema/parquet/stage/parquet_s3.sql +++ b/snowflake/objects/database/recover/schema/parquet/stage/parquet_prod_s3.sql @@ -1,5 +1,5 @@ /* - Create an external stage over the Parquet data in S3 + Create an external stage over the production Parquet data in S3 */ CREATE OR REPLACE STAGE {{ parquet_stage_name }} URL = 's3://recover-processed-data/{{ git_branch }}/parquet/' diff --git a/snowflake/objects/deploy.sql b/snowflake/objects/deploy.sql index c06a08ce..5a797ef3 100644 --- a/snowflake/objects/deploy.sql +++ b/snowflake/objects/deploy.sql @@ -16,6 +16,9 @@ - STORAGE INTEGRATION `RECOVER_PROD_S3` * An S3 storage integration which allows access to the S3 buckets in the RECOVER production account. + - STORAGE INTEGRATION `RECOVER_DEV_S3` + * An S3 storage integration which allows access to the + S3 buckets in the RECOVER dev account. Additionally, we assume that the following databases have already been created when deploying to the "staging" or "main" environment, respectively: