From 8f4835844597d88793b1acec50b6f5ec7d16d003 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Wed, 6 Mar 2024 08:36:46 -0700 Subject: [PATCH 01/14] [CHANGE] Adding additinal parameters to SBG e2e workflow --- airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py index deea94d5..2f1c76c4 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py @@ -54,6 +54,7 @@ # For unity data upload step, unity catalog "output_isofit_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_RFL___1", type="string"), "output_resample_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_RSRFL___1", type="string"), + "output_refcorrect_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_CORFL___1", type="string"), }, ) @@ -73,6 +74,7 @@ def setup(ti=None, **context): "input_aux_stac": context["params"]["input_aux_stac"], "output_isofit_collection_id": context["params"]["output_isofit_collection_id"], "output_resample_collection_id": context["params"]["output_resample_collection_id"], + "output_refcorrect_collection_id": context["params"]["output_refcorrect_collection_id"], } ti.xcom_push(key="cwl_args", value=json.dumps(task_dict)) From 1fb7c9d90b3420b92cff4b05e8deb3692c16e4ca Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 09:03:37 -0700 Subject: [PATCH 02/14] [CHANGE] Using the experimental sps-docker-cwl image --- airflow/dags/docker_cwl_pod.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 535aa59c..4ab12466 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -8,7 +8,8 @@ spec: containers: - name: cwl-docker - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 + # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop4 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: From 4e27343c0a0fd602f716cf9bc89c4df237a3e706 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 09:17:01 -0700 Subject: [PATCH 03/14] [CHANGE] Removing the constraint on 1 max concurrent DAG for each type --- airflow/dags/cwl_dag.py | 2 +- airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py | 2 +- airflow/dags/sbg_preprocess_cwl_dag.py | 19 ++++++------------- airflow/dags/sbg_preprocess_no_cwl.py | 2 +- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 9ebd00ca..a2c7f740 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -45,7 +45,7 @@ is_paused_upon_creation=False, catchup=False, schedule_interval=None, - max_active_runs=1, + # max_active_runs=1, default_args=dag_default_args, params={ "cwl_workflow": Param( diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py index 2f1c76c4..52a25777 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py @@ -34,7 +34,7 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=1, + # max_active_runs=1, default_args=dag_default_args, params={ "cwl_workflow": Param(CWL_WORKFLOW, type="string"), diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index a4549c9c..a3d4b88c 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -20,6 +20,9 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner) +WORKING_DIR = "/scratch" # Default DAG configuration dag_default_args = { @@ -27,9 +30,7 @@ "depends_on_past": False, "start_date": datetime.utcfromtimestamp(0), } -CWL_WORKFLOW = ( - "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -) +CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" CMR_STAC = "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z" dag = DAG( @@ -39,15 +40,11 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=1, + # max_active_runs=1, default_args=dag_default_args, params={ "cwl_workflow": Param(CWL_WORKFLOW, type="string"), "input_cmr_stac": Param(CMR_STAC, type="string"), - # "input_processing_labels": Param(["label1", "label2"], type="string[]"), - # "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), - # "input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"), - # "input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"), "input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"), "input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"), "input_crid": Param("001", type="string"), @@ -67,9 +64,6 @@ def setup(ti=None, **context): task_dict = { "input_processing_labels": ["label1", "label2"], "input_cmr_stac": context["params"]["input_cmr_stac"], - # "input_cmr_collection_name": context["params"]["input_cmr_collection_name"], - # "input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"], - # "input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"], "input_unity_dapa_api": context["params"]["input_unity_dapa_api"], "input_unity_dapa_client": context["params"]["input_unity_dapa_client"], "input_crid": context["params"]["input_crid"], @@ -93,8 +87,7 @@ def setup(ti=None, **context): task_id="SBG_Preprocess_CWL", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", "/scratch"], - # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR], dag=dag, volume_mounts=[ k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}") diff --git a/airflow/dags/sbg_preprocess_no_cwl.py b/airflow/dags/sbg_preprocess_no_cwl.py index 5c4159ff..ec921066 100644 --- a/airflow/dags/sbg_preprocess_no_cwl.py +++ b/airflow/dags/sbg_preprocess_no_cwl.py @@ -62,7 +62,7 @@ is_paused_upon_creation=True, catchup=False, schedule=None, - max_active_runs=1, + # max_active_runs=1, default_args=dag_default_args, params={ "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), From 45d554539593496be491cb4f03abd9bd4e0164f0 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 10:00:36 -0700 Subject: [PATCH 04/14] [CHANGE] Switching to max_active_runs=100 --- airflow/dags/cwl_dag.py | 2 +- airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py | 2 +- airflow/dags/sbg_preprocess_cwl_dag.py | 2 +- airflow/dags/sbg_preprocess_no_cwl.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index a2c7f740..2d672705 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -45,7 +45,7 @@ is_paused_upon_creation=False, catchup=False, schedule_interval=None, - # max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "cwl_workflow": Param( diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py index 52a25777..93f21162 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py @@ -34,7 +34,7 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - # max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "cwl_workflow": Param(CWL_WORKFLOW, type="string"), diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index a3d4b88c..0d9d2a0c 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -40,7 +40,7 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - # max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "cwl_workflow": Param(CWL_WORKFLOW, type="string"), diff --git a/airflow/dags/sbg_preprocess_no_cwl.py b/airflow/dags/sbg_preprocess_no_cwl.py index ec921066..0863600b 100644 --- a/airflow/dags/sbg_preprocess_no_cwl.py +++ b/airflow/dags/sbg_preprocess_no_cwl.py @@ -62,7 +62,7 @@ is_paused_upon_creation=True, catchup=False, schedule=None, - # max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), From 7e10f63b45ba01498e47ee44353e5b00c71da32b Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 10:44:46 -0700 Subject: [PATCH 05/14] Using --tmp-outdir-prefix to create the files locally --- airflow/dags/docker_cwl_pod.yaml | 2 +- airflow/docker/cwl/docker_cwl_entrypoint.sh | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 4ab12466..31f1438f 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -9,7 +9,7 @@ spec: containers: - name: cwl-docker # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop4 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop6 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: diff --git a/airflow/docker/cwl/docker_cwl_entrypoint.sh b/airflow/docker/cwl/docker_cwl_entrypoint.sh index 3beac4a2..bb2237bf 100755 --- a/airflow/docker/cwl/docker_cwl_entrypoint.sh +++ b/airflow/docker/cwl/docker_cwl_entrypoint.sh @@ -10,7 +10,7 @@ # $3: optional working directory, defaults to the current directory # Note: The working must be accessible by the Docker container that executes this script -set -e +set -ex cwl_workflow=$1 job_args=$2 work_dir=${3:-.} @@ -43,7 +43,9 @@ done # Execute CWL workflow . /usr/share/cwl/venv/bin/activate -cwl-runner "$cwl_workflow" "$job_args" +pwd +mkdir -p ./cache +cwl-runner --cachedir ./cache --tmp-outdir-prefix "$PWD"/ "$cwl_workflow" "$job_args" deactivate # Stop Docker engine From 696d0dcfb47b27fedb0a7e0f3e98c0c905781a4f Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 11:07:13 -0700 Subject: [PATCH 06/14] [FIX] cwl-runner: error: argument --tmp-outdir-prefix: not allowed with argument --cachedir --- airflow/dags/docker_cwl_pod.yaml | 2 +- airflow/docker/cwl/docker_cwl_entrypoint.sh | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 31f1438f..d5354cfa 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -9,7 +9,7 @@ spec: containers: - name: cwl-docker # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop6 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop7 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: diff --git a/airflow/docker/cwl/docker_cwl_entrypoint.sh b/airflow/docker/cwl/docker_cwl_entrypoint.sh index bb2237bf..300e485d 100755 --- a/airflow/docker/cwl/docker_cwl_entrypoint.sh +++ b/airflow/docker/cwl/docker_cwl_entrypoint.sh @@ -44,8 +44,7 @@ done # Execute CWL workflow . /usr/share/cwl/venv/bin/activate pwd -mkdir -p ./cache -cwl-runner --cachedir ./cache --tmp-outdir-prefix "$PWD"/ "$cwl_workflow" "$job_args" +cwl-runner --tmp-outdir-prefix "$PWD"/ "$cwl_workflow" "$job_args" deactivate # Stop Docker engine From d0b7c8b845c7b978db31fc89e9865e9315f69350 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 11:47:01 -0700 Subject: [PATCH 07/14] [CHANGE] Adding volume support to the generic cwl-dag DAG --- airflow/dags/cwl_dag.py | 55 +++++++++++++++++++++++--- airflow/dags/docker_cwl_pod.yaml | 2 +- airflow/dags/sbg_preprocess_cwl_dag.py | 2 +- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 2d672705..6932c19e 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -6,7 +6,9 @@ import json import uuid from datetime import datetime +import os +from airflow.operators.python import PythonOperator, get_current_context from airflow.models.param import Param from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s @@ -20,10 +22,12 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner) +WORKING_DIR = "/scratch" + # Example arguments -default_cwl_workflow = ( - "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -) +default_cwl_workflow = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" default_args_as_json_dict = { "input_processing_labels": ["label1", "label2"], "input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z", @@ -35,7 +39,11 @@ } # Default DAG configuration -dag_default_args = {"owner": "unity-sps", "depends_on_past": False, "start_date": datetime(2024, 1, 1, 0, 0)} +dag_default_args = { + "owner": "unity-sps", + "depends_on_past": False, + "start_date": datetime(2024, 1, 1, 0, 0) +} # The DAG dag = DAG( @@ -63,6 +71,16 @@ # Environment variables default_env_vars = {} +# This task that creates the working directory on the shared volume +def setup(ti=None, **context): + context = get_current_context() + dag_run_id = context["dag_run"].run_id + local_dir = os.path.dirname(f"/shared-task-data/{dag_run_id}") + os.makedirs(local_dir, exist_ok=True) + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) + # This section defines KubernetesPodOperator cwl_task = KubernetesPodOperator( namespace=POD_NAMESPACE, @@ -76,6 +94,33 @@ metadata=k8s.V1ObjectMeta(name="docker-cwl-pod-" + uuid.uuid4().hex), ), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}"], + arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}", WORKING_DIR], dag=dag, + volume_mounts=[ + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") + ], + volumes=[ + k8s.V1Volume( + name="workers-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="kpo-efs"), + ) + ], ) + +def cleanup(**context): + dag_run_id = context["dag_run"].run_id + local_dir = f"/shared-task-data/{dag_run_id}" + if os.path.exists(local_dir): + shutil.rmtree(local_dir) + print(f"Deleted directory: {local_dir}") + else: + print(f"Directory does not exist, no need to delete: {local_dir}") + + +cleanup_task = PythonOperator( + task_id="Cleanup", + python_callable=cleanup, + dag=dag, +) + +setup_task >> cwl_task >> cleanup_task \ No newline at end of file diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index d5354cfa..54524252 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -9,7 +9,7 @@ spec: containers: - name: cwl-docker # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop7 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop8 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index 0d9d2a0c..2b9aa918 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -90,7 +90,7 @@ def setup(ti=None, **context): arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR], dag=dag, volume_mounts=[ - k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}") + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") ], volumes=[ k8s.V1Volume( From 6393397f38619d870fe72caccc9d3e333a1e1f1a Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 16:43:20 -0700 Subject: [PATCH 08/14] [FIX] Adding shutil --- airflow/dags/cwl_dag.py | 1 + airflow/dags/docker_cwl_pod.yaml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 6932c19e..08d268c1 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -7,6 +7,7 @@ import uuid from datetime import datetime import os +import shutil from airflow.operators.python import PythonOperator, get_current_context from airflow.models.param import Param diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 54524252..4591132a 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -9,7 +9,7 @@ spec: containers: - name: cwl-docker # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop8 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop9 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: From b62e386e400582a5b06d835aa49f91bfcba607e6 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 11 Mar 2024 04:12:22 -0600 Subject: [PATCH 09/14] [CHANGE] Adding parameter for working directory --- airflow/dags/cwl_dag.py | 14 +++++++++----- airflow/dags/docker_cwl_pod.yaml | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 08d268c1..04e7b92c 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -29,7 +29,8 @@ # Example arguments default_cwl_workflow = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -default_args_as_json_dict = { +# default_cwl_args = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" +default_cwl_args = { "input_processing_labels": ["label1", "label2"], "input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z", "input_unity_dapa_client": "40c2s0ulbhp9i0fmaph3su9jch", @@ -60,12 +61,15 @@ "cwl_workflow": Param( default_cwl_workflow, type="string", title="CWL workflow", description="The CWL workflow URL" ), - "args_as_json": Param( - json.dumps(default_args_as_json_dict), + "cwl_args": Param( + json.dumps(default_cwl_args), type="string", - title="CWL wokflow parameters", + title="CWL workflow parameters", description="The job parameters encodes as a JSON string, or the URL of a JSON or YAML file", ), + "working_dir": Param( + WORKING_DIR, type="string", title="Working directory", description="Use '.' for EBS, '/scratch' for EFS" + ) }, ) @@ -95,7 +99,7 @@ def setup(ti=None, **context): metadata=k8s.V1ObjectMeta(name="docker-cwl-pod-" + uuid.uuid4().hex), ), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}", WORKING_DIR], + arguments=["{{ params.cwl_workflow }}", "{{ params.cwl_args }}", "{{ params.working_dir }}"], dag=dag, volume_mounts=[ k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 4591132a..3da918db 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -9,7 +9,7 @@ spec: containers: - name: cwl-docker # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop9 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop10 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: From 2a9e7373052ae91a39788c38c4f19eb277e7b4ec Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 11 Mar 2024 09:02:24 -0600 Subject: [PATCH 10/14] [CHANGE] Creating 2.0.0-alpha.2 --- airflow/dags/docker_cwl_pod.yaml | 3 +-- airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py | 7 ++++++- airflow/dags/sbg_preprocess_cwl_dag.py | 3 ++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 3da918db..ae66548e 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -8,8 +8,7 @@ spec: containers: - name: cwl-docker - # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop10 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.2 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py index 93f21162..38e5e166 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py @@ -18,6 +18,11 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner). +# This is fixed to the EFS /scratch directory in this DAG. +WORKING_DIR = "/scratch" + # Default DAG configuration dag_default_args = { "owner": "unity-sps", @@ -93,7 +98,7 @@ def setup(ti=None, **context): task_id="SBG_L1_to_L2_End_To_End_CWL", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-l1-to-l2-e2e-cwl-pod-" + uuid.uuid4().hex))), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"], + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR], # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, dag=dag, ) diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index 2b9aa918..c1e62652 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -21,7 +21,8 @@ POD_NAMESPACE = "airflow" # The path of the working directory where the CWL workflow is executed -# (aka the starting directory for cwl-runner) +# (aka the starting directory for cwl-runner). +# This is fixed to the EFS /scratch directory in this DAG. WORKING_DIR = "/scratch" # Default DAG configuration From ebe54824993dd6845e7c92ac81a2b339e6ed2bc3 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 11 Mar 2024 09:52:58 -0600 Subject: [PATCH 11/14] [CHANGE] Adding newline at the end of file --- airflow/dags/cwl_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 04e7b92c..1d03caf7 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -128,4 +128,4 @@ def cleanup(**context): dag=dag, ) -setup_task >> cwl_task >> cleanup_task \ No newline at end of file +setup_task >> cwl_task >> cleanup_task From 5f4bf0d6209122afb7cb784bdedb0c368583fb62 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 11 Mar 2024 11:33:52 -0600 Subject: [PATCH 12/14] [CHANGE] Using ephemeral-storage: "20Gi" --- airflow/dags/docker_cwl_pod.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index ae66548e..ab48fc20 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -13,3 +13,8 @@ spec: command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: privileged: true + resources: + requests: + ephemeral-storage: "20Gi" + limits: + ephemeral-storage: "20Gi" From fe7ae2203812938ae4dd49c068f47a8474bb6de4 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 11 Mar 2024 14:00:04 -0600 Subject: [PATCH 13/14] [CHANGE] Usig /scratch EFS partition for the DIND Docker directory --- airflow/docker/cwl/docker_cwl_entrypoint.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/docker/cwl/docker_cwl_entrypoint.sh b/airflow/docker/cwl/docker_cwl_entrypoint.sh index 300e485d..c8905217 100755 --- a/airflow/docker/cwl/docker_cwl_entrypoint.sh +++ b/airflow/docker/cwl/docker_cwl_entrypoint.sh @@ -32,7 +32,10 @@ mkdir -p "$work_dir" cd $work_dir # Start Docker engine -dockerd &> dockerd-logfile & +# Move the Docker root directory to the larger EFS partition +docker_dir="$work_dir"/docker +mkdir -p "$docker_dir" +dockerd --data-root "$docker_dir" &> dockerd-logfile & # Wait until Docker engine is running # Loop until 'docker version' exits with 0. From faf17afc5835772bac3a6d94a59a4e41fe137a17 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 11 Mar 2024 14:27:15 -0600 Subject: [PATCH 14/14] [FIX] Pod calling the latest version of the DIND version --- airflow/dags/docker_cwl_pod.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index ab48fc20..dffd6cba 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -8,7 +8,8 @@ spec: containers: - name: cwl-docker - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.2 + # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.2 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop13 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: