Skip to content

Commit

Permalink
Merge pull request #319 from unity-sds/stop-using-efs-in-cws-dag
Browse files Browse the repository at this point in the history
Stop using the EFS partition in the CWL DAG
  • Loading branch information
LucaCinquini authored Feb 14, 2025
2 parents 8a96441 + ff268ee commit a1ee141
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 100 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ repos:
rev: 5.13.2
hooks:
- id: isort
args: ["--profile=black"]

- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.1.1
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ repos:
rev: 5.13.2
hooks:
- id: isort
args: ["--profile=black"]

- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.4.2
Expand Down
120 changes: 24 additions & 96 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,26 @@
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import get_affinity
from unity_sps_utils import (
EC2_TYPES,
NODE_POOL_DEFAULT,
NODE_POOL_HIGH_WORKLOAD,
POD_LABEL,
POD_NAMESPACE,
build_ec2_type_label,
get_affinity,
)

from airflow import DAG

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "sps"

# Note: each Pod is assigned the same label to assure that (via the anti-affinity requirements)
# two Pods with the same label cannot run on the same Node
POD_LABEL = "cwl_task"
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.5.1"
NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator"
NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload"
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.5.3"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner).
# This is fixed to the EFS /scratch directory in this DAG.
WORKING_DIR = "/scratch"
# WORKING_DIR = "/scratch"

# default parameters
DEFAULT_CWL_WORKFLOW = (
Expand All @@ -50,78 +52,6 @@
}
)

EC2_TYPES = {
"t3.micro": {
"desc": "General Purpose",
"cpu": 1,
"memory": 1,
},
"t3.small": {
"desc": "General Purpose",
"cpu": 2,
"memory": 2,
},
"t3.medium": {
"desc": "General Purpose",
"cpu": 2,
"memory": 4,
},
"t3.large": {
"desc": "General Purpose",
"cpu": 2,
"memory": 8,
},
"t3.xlarge": {
"desc": "General Purpose",
"cpu": 4,
"memory": 16,
},
"t3.2xlarge": {
"desc": "General Purpose",
"cpu": 8,
"memory": 32,
},
"r7i.xlarge": {
"desc": "Memory Optimized",
"cpu": 4,
"memory": 32,
},
"r7i.2xlarge": {
"desc": "Memory Optimized",
"cpu": 8,
"memory": 64,
},
"r7i.4xlarge": {
"desc": "Memory Optimized",
"cpu": 16,
"memory": 128,
},
"r7i.8xlarge": {
"desc": "Memory Optimized",
"cpu": 32,
"memory": 256,
},
"c6i.xlarge": {
"desc": "Compute Optimized",
"cpu": 4,
"memory": 8,
},
"c6i.2xlarge": {
"desc": "Compute Optimized",
"cpu": 8,
"memory": 16,
},
"c6i.4xlarge": {
"desc": "Compute Optimized",
"cpu": 16,
"memory": 32,
},
"c6i.8xlarge": {
"desc": "Compute Optimized",
"cpu": 32,
"memory": 64,
},
}

# Default DAG configuration
dag_default_args = {
Expand All @@ -130,12 +60,6 @@
"start_date": datetime.utcfromtimestamp(0),
}


# "t3.large": "t3.large (General Purpose: 2vCPU, 8GiB)",
def build_ec2_type_label(key):
return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)"


dag = DAG(
dag_id="cwl_dag",
description="CWL DAG",
Expand Down Expand Up @@ -240,15 +164,15 @@ def setup(ti=None, **context):
},
),
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
# volume_mounts=[
# k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
# ],
# volumes=[
# k8s.V1Volume(
# name="workers-volume",
# persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
# )
# ],
dag=dag,
node_selector={
"karpenter.sh/nodepool": "{{ti.xcom_pull(task_ids='Setup', key='node_pool')}}",
Expand All @@ -275,6 +199,10 @@ def cleanup(**context):
dag_run_id = context["dag_run"].run_id
local_dir = f"/shared-task-data/{dag_run_id}"
if os.path.exists(local_dir):
logging.info(f"Content of directory: {local_dir}")
files = os.listdir(local_dir)
for f in files:
logging.info(os.path.join(local_dir, f))
shutil.rmtree(local_dir)
logging.info(f"Deleted directory: {local_dir}")
else:
Expand Down
7 changes: 5 additions & 2 deletions airflow/docker/cwl/docker_cwl_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
# (example: <aws_account_id>.dkr.ecr.<region>.amazonaws.com) [optional]
# -o: path to an output JSON file that needs to be shared as Airflow "xcom" data [optional]

# Must be the same as the path of the Persistent Volume mounted by the Airflow KubernetesPodOperator
# Set equal to the path of the EFS Persistent Volume mounted by the Airflow KubernetesPodOperator
# that executes this script
WORKING_DIR="/scratch"
# WORKING_DIR="/scratch"

# Set to a local path on the Pod EBS volume
WORKING_DIR="/data"

set -ex
while getopts w:j:e:o: flag
Expand Down
77 changes: 77 additions & 0 deletions airflow/plugins/unity_sps_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,83 @@

DS_S3_BUCKET_PARAM = f"/unity/unity/{os.environ['AIRFLOW_VAR_UNITY_VENUE']}/ds/datastore-bucket"

EC2_TYPES = {
"t3.micro": {
"desc": "General Purpose",
"cpu": 1,
"memory": 1,
},
"t3.small": {
"desc": "General Purpose",
"cpu": 2,
"memory": 2,
},
"t3.medium": {
"desc": "General Purpose",
"cpu": 2,
"memory": 4,
},
"t3.large": {
"desc": "General Purpose",
"cpu": 2,
"memory": 8,
},
"t3.xlarge": {
"desc": "General Purpose",
"cpu": 4,
"memory": 16,
},
"t3.2xlarge": {
"desc": "General Purpose",
"cpu": 8,
"memory": 32,
},
"r7i.xlarge": {
"desc": "Memory Optimized",
"cpu": 4,
"memory": 32,
},
"r7i.2xlarge": {
"desc": "Memory Optimized",
"cpu": 8,
"memory": 64,
},
"r7i.4xlarge": {
"desc": "Memory Optimized",
"cpu": 16,
"memory": 128,
},
"r7i.8xlarge": {
"desc": "Memory Optimized",
"cpu": 32,
"memory": 256,
},
"c6i.xlarge": {
"desc": "Compute Optimized",
"cpu": 4,
"memory": 8,
},
"c6i.2xlarge": {
"desc": "Compute Optimized",
"cpu": 8,
"memory": 16,
},
"c6i.4xlarge": {
"desc": "Compute Optimized",
"cpu": 16,
"memory": 32,
},
"c6i.8xlarge": {
"desc": "Compute Optimized",
"cpu": 32,
"memory": 64,
},
}


def build_ec2_type_label(key):
return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)"


class SpsKubernetesPodOperator(KubernetesPodOperator):
"""
Expand Down
Loading

0 comments on commit a1ee141

Please sign in to comment.