Skip to content

Commit

Permalink
[CHANGE] Adding volume support to the generic cwl-dag DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCinquini committed Mar 7, 2024
1 parent 696d0dc commit d0b7c8b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
55 changes: 50 additions & 5 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import json
import uuid
from datetime import datetime
import os

from airflow.operators.python import PythonOperator, get_current_context
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
Expand All @@ -20,10 +22,12 @@
# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner)
WORKING_DIR = "/scratch"

# Example arguments
default_cwl_workflow = (
"https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
)
default_cwl_workflow = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
default_args_as_json_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z",
Expand All @@ -35,7 +39,11 @@
}

# Default DAG configuration
dag_default_args = {"owner": "unity-sps", "depends_on_past": False, "start_date": datetime(2024, 1, 1, 0, 0)}
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1, 0, 0)
}

# The DAG
dag = DAG(
Expand Down Expand Up @@ -63,6 +71,16 @@
# Environment variables
default_env_vars = {}

# This task that creates the working directory on the shared volume
def setup(ti=None, **context):
context = get_current_context()
dag_run_id = context["dag_run"].run_id
local_dir = os.path.dirname(f"/shared-task-data/{dag_run_id}")
os.makedirs(local_dir, exist_ok=True)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)

# This section defines KubernetesPodOperator
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
Expand All @@ -76,6 +94,33 @@
metadata=k8s.V1ObjectMeta(name="docker-cwl-pod-" + uuid.uuid4().hex),
),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}"],
arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}", WORKING_DIR],
dag=dag,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="kpo-efs"),
)
],
)

def cleanup(**context):
dag_run_id = context["dag_run"].run_id
local_dir = f"/shared-task-data/{dag_run_id}"
if os.path.exists(local_dir):
shutil.rmtree(local_dir)
print(f"Deleted directory: {local_dir}")
else:
print(f"Directory does not exist, no need to delete: {local_dir}")


cleanup_task = PythonOperator(
task_id="Cleanup",
python_callable=cleanup,
dag=dag,
)

setup_task >> cwl_task >> cleanup_task
2 changes: 1 addition & 1 deletion airflow/dags/docker_cwl_pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
containers:
- name: cwl-docker
# image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop7
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop8
imagePullPolicy: Always
command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
securityContext:
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def setup(ti=None, **context):
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR],
dag=dag,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}")
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
Expand Down

0 comments on commit d0b7c8b

Please sign in to comment.