diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 9ebd00ca..1d03caf7 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -6,7 +6,10 @@ import json import uuid from datetime import datetime +import os +import shutil +from airflow.operators.python import PythonOperator, get_current_context from airflow.models.param import Param from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s @@ -20,11 +23,14 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner) +WORKING_DIR = "/scratch" + # Example arguments -default_cwl_workflow = ( - "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -) -default_args_as_json_dict = { +default_cwl_workflow = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" +# default_cwl_args = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" +default_cwl_args = { "input_processing_labels": ["label1", "label2"], "input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z", "input_unity_dapa_client": "40c2s0ulbhp9i0fmaph3su9jch", @@ -35,7 +41,11 @@ } # Default DAG configuration -dag_default_args = {"owner": "unity-sps", "depends_on_past": False, "start_date": datetime(2024, 1, 1, 0, 0)} +dag_default_args = { + "owner": "unity-sps", + "depends_on_past": False, + "start_date": datetime(2024, 1, 1, 0, 0) +} # The DAG dag = DAG( @@ -45,24 +55,37 @@ is_paused_upon_creation=False, catchup=False, schedule_interval=None, - max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "cwl_workflow": Param( default_cwl_workflow, type="string", title="CWL workflow", description="The CWL workflow URL" ), - "args_as_json": Param( - json.dumps(default_args_as_json_dict), + "cwl_args": Param( + json.dumps(default_cwl_args), type="string", - title="CWL wokflow parameters", + title="CWL workflow parameters", description="The job parameters encodes as a JSON string, or the URL of a JSON or YAML file", ), + "working_dir": Param( + WORKING_DIR, type="string", title="Working directory", description="Use '.' for EBS, '/scratch' for EFS" + ) }, ) # Environment variables default_env_vars = {} +# This task that creates the working directory on the shared volume +def setup(ti=None, **context): + context = get_current_context() + dag_run_id = context["dag_run"].run_id + local_dir = os.path.dirname(f"/shared-task-data/{dag_run_id}") + os.makedirs(local_dir, exist_ok=True) + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) + # This section defines KubernetesPodOperator cwl_task = KubernetesPodOperator( namespace=POD_NAMESPACE, @@ -76,6 +99,33 @@ metadata=k8s.V1ObjectMeta(name="docker-cwl-pod-" + uuid.uuid4().hex), ), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}"], + arguments=["{{ params.cwl_workflow }}", "{{ params.cwl_args }}", "{{ params.working_dir }}"], dag=dag, + volume_mounts=[ + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") + ], + volumes=[ + k8s.V1Volume( + name="workers-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="kpo-efs"), + ) + ], ) + +def cleanup(**context): + dag_run_id = context["dag_run"].run_id + local_dir = f"/shared-task-data/{dag_run_id}" + if os.path.exists(local_dir): + shutil.rmtree(local_dir) + print(f"Deleted directory: {local_dir}") + else: + print(f"Directory does not exist, no need to delete: {local_dir}") + + +cleanup_task = PythonOperator( + task_id="Cleanup", + python_callable=cleanup, + dag=dag, +) + +setup_task >> cwl_task >> cleanup_task diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 535aa59c..dffd6cba 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -8,8 +8,14 @@ spec: containers: - name: cwl-docker - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1 + # image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.2 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop13 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: privileged: true + resources: + requests: + ephemeral-storage: "20Gi" + limits: + ephemeral-storage: "20Gi" diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py index deea94d5..38e5e166 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py @@ -18,6 +18,11 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner). +# This is fixed to the EFS /scratch directory in this DAG. +WORKING_DIR = "/scratch" + # Default DAG configuration dag_default_args = { "owner": "unity-sps", @@ -34,7 +39,7 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "cwl_workflow": Param(CWL_WORKFLOW, type="string"), @@ -54,6 +59,7 @@ # For unity data upload step, unity catalog "output_isofit_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_RFL___1", type="string"), "output_resample_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_RSRFL___1", type="string"), + "output_refcorrect_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_CORFL___1", type="string"), }, ) @@ -73,6 +79,7 @@ def setup(ti=None, **context): "input_aux_stac": context["params"]["input_aux_stac"], "output_isofit_collection_id": context["params"]["output_isofit_collection_id"], "output_resample_collection_id": context["params"]["output_resample_collection_id"], + "output_refcorrect_collection_id": context["params"]["output_refcorrect_collection_id"], } ti.xcom_push(key="cwl_args", value=json.dumps(task_dict)) @@ -91,7 +98,7 @@ def setup(ti=None, **context): task_id="SBG_L1_to_L2_End_To_End_CWL", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-l1-to-l2-e2e-cwl-pod-" + uuid.uuid4().hex))), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"], + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR], # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, dag=dag, ) diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index a4549c9c..c1e62652 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -20,6 +20,10 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner). +# This is fixed to the EFS /scratch directory in this DAG. +WORKING_DIR = "/scratch" # Default DAG configuration dag_default_args = { @@ -27,9 +31,7 @@ "depends_on_past": False, "start_date": datetime.utcfromtimestamp(0), } -CWL_WORKFLOW = ( - "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -) +CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" CMR_STAC = "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z" dag = DAG( @@ -39,15 +41,11 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "cwl_workflow": Param(CWL_WORKFLOW, type="string"), "input_cmr_stac": Param(CMR_STAC, type="string"), - # "input_processing_labels": Param(["label1", "label2"], type="string[]"), - # "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), - # "input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"), - # "input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"), "input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"), "input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"), "input_crid": Param("001", type="string"), @@ -67,9 +65,6 @@ def setup(ti=None, **context): task_dict = { "input_processing_labels": ["label1", "label2"], "input_cmr_stac": context["params"]["input_cmr_stac"], - # "input_cmr_collection_name": context["params"]["input_cmr_collection_name"], - # "input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"], - # "input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"], "input_unity_dapa_api": context["params"]["input_unity_dapa_api"], "input_unity_dapa_client": context["params"]["input_unity_dapa_client"], "input_crid": context["params"]["input_crid"], @@ -93,11 +88,10 @@ def setup(ti=None, **context): task_id="SBG_Preprocess_CWL", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", "/scratch"], - # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR], dag=dag, volume_mounts=[ - k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}") + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") ], volumes=[ k8s.V1Volume( diff --git a/airflow/dags/sbg_preprocess_no_cwl.py b/airflow/dags/sbg_preprocess_no_cwl.py index 5c4159ff..0863600b 100644 --- a/airflow/dags/sbg_preprocess_no_cwl.py +++ b/airflow/dags/sbg_preprocess_no_cwl.py @@ -62,7 +62,7 @@ is_paused_upon_creation=True, catchup=False, schedule=None, - max_active_runs=1, + max_active_runs=100, default_args=dag_default_args, params={ "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), diff --git a/airflow/docker/cwl/docker_cwl_entrypoint.sh b/airflow/docker/cwl/docker_cwl_entrypoint.sh index 3beac4a2..c8905217 100755 --- a/airflow/docker/cwl/docker_cwl_entrypoint.sh +++ b/airflow/docker/cwl/docker_cwl_entrypoint.sh @@ -10,7 +10,7 @@ # $3: optional working directory, defaults to the current directory # Note: The working must be accessible by the Docker container that executes this script -set -e +set -ex cwl_workflow=$1 job_args=$2 work_dir=${3:-.} @@ -32,7 +32,10 @@ mkdir -p "$work_dir" cd $work_dir # Start Docker engine -dockerd &> dockerd-logfile & +# Move the Docker root directory to the larger EFS partition +docker_dir="$work_dir"/docker +mkdir -p "$docker_dir" +dockerd --data-root "$docker_dir" &> dockerd-logfile & # Wait until Docker engine is running # Loop until 'docker version' exits with 0. @@ -43,7 +46,8 @@ done # Execute CWL workflow . /usr/share/cwl/venv/bin/activate -cwl-runner "$cwl_workflow" "$job_args" +pwd +cwl-runner --tmp-outdir-prefix "$PWD"/ "$cwl_workflow" "$job_args" deactivate # Stop Docker engine