Skip to content

Commit

Permalink
Merge pull request #12 from unity-sds/264-execution-of-additional-sbg…
Browse files Browse the repository at this point in the history
…-workflows

#264 Execution of additional SBG workflows
  • Loading branch information
drewm-swe authored Mar 5, 2024
2 parents ee6041d + 186b11b commit b733337
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 11 deletions.
13 changes: 10 additions & 3 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@
dag_id="cwl-dag",
description="DAG to execute a generic CWL workflow",
tags=["cwl", "unity-sps", "docker"],
is_paused_upon_creation=True,
is_paused_upon_creation=False,
catchup=False,
schedule_interval=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(default_cwl_workflow, type="string"),
"args_as_json": Param(json.dumps(default_args_as_json_dict), type="string"),
"cwl_workflow": Param(
default_cwl_workflow, type="string", title="CWL workflow", description="The CWL workflow URL"
),
"args_as_json": Param(
json.dumps(default_args_as_json_dict),
type="string",
title="CWL wokflow parameters",
description="The job parameters encodes as a JSON string, or the URL of a JSON or YAML file",
),
},
)

Expand Down
3 changes: 2 additions & 1 deletion airflow/dags/docker_cwl_pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ spec:

containers:
- name: cwl-docker
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:develop-1
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-alpha.1
imagePullPolicy: Always
command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
securityContext:
privileged: true
99 changes: 99 additions & 0 deletions airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# DAG for executing the SBG L1-to-L2 End-To-End Workflow
# See https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.cwl
import json
import uuid
from datetime import datetime

from airflow.models.param import Param
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

from airflow import DAG

# The Kubernetes Pod that executes the CWL-Docker container
# Must use elevated privileges to start/stop the Docker engine
POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml"

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"

# Default DAG configuration
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}
CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
INPUT_AUX_STAC = '{"numberMatched":{"total_size":1},"numberReturned":1,"stac_version":"1.0.0","type":"FeatureCollection","links":[{"rel":"self","href":"https://d3vc8w9zcq658.cloudfront.net/am-uds-dapa/collections/urn:nasa:unity:unity:dev:SBG-L1B_PRE___1/items?limit=10"},{"rel":"root","href":"https://d3vc8w9zcq658.cloudfront.net"}],"features":[{"type":"Feature","stac_version":"1.0.0","id":"urn:nasa:unity:unity:dev:SBG-AUX___1:sRTMnet_v120","properties":{"datetime":"2024-02-14T22:04:41.078000Z","start_datetime":"2024-01-03T13:19:36Z","end_datetime":"2024-01-03T13:19:48Z","created":"2024-01-03T13:19:36Z","updated":"2024-02-14T22:05:25.248000Z","status":"completed","provider":"unity"},"geometry":{"type":"Point","coordinates":[0,0]},"links":[{"rel":"collection","href":"."}],"assets":{"sRTMnet_v120.h5":{"href":"s3://sps-dev-ds-storage/urn:nasa:unity:unity:dev:SBG-AUX___1/urn:nasa:unity:unity:dev:SBG-AUX___1:sRTMnet_v120.h5/sRTMnet_v120.h5","title":"sRTMnet_v120.h5","description":"size=-1;checksumType=md5;checksum=unknown;","roles":["data"]},"sRTMnet_v120_aux.npz":{"href":"s3://sps-dev-ds-storage/urn:nasa:unity:unity:dev:SBG-AUX___1/urn:nasa:unity:unity:dev:SBG-AUX___1:sRTMnet_v120.h5/sRTMnet_v120_aux.npz","title":"sRTMnet_v120_aux.npz","description":"size=-1;checksumType=md5;checksum=unknown;","roles":["data"]}},"bbox":[-180,-90,180,90],"stac_extensions":[],"collection":"urn:nasa:unity:unity:dev:SBG-AUX___1"}]}'

dag = DAG(
dag_id="sbg-l1-to-l2-e2e-cwl-dag",
description="SBG L1 to L2 End-To-End Workflow as CWL",
tags=["SBG", "Unity", "SPS", "NASA", "JPL"],
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
# For CMR Search Step
"input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"),
"input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"),
# U-DS config
"input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
#
"input_crid": Param("001", type="string"),
# For unity data upload step, unity catalog
"output_preprocess_collection_id": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"output_data_bucket": Param("sps-dev-ds-storage", type="string"),
"input_aux_stac": Param(INPUT_AUX_STAC, type="string"),
# For unity data upload step, unity catalog
"output_isofit_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_RFL___1", type="string"),
"output_resample_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L2A_RSRFL___1", type="string"),
},
)


# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):
task_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_collection_name": context["params"]["input_cmr_collection_name"],
"input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"],
"input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"],
"input_unity_dapa_api": context["params"]["input_unity_dapa_api"],
"input_unity_dapa_client": context["params"]["input_unity_dapa_client"],
"input_crid": context["params"]["input_crid"],
"output_preprocess_collection_id": context["params"]["output_preprocess_collection_id"],
"output_data_bucket": context["params"]["output_data_bucket"],
"input_aux_stac": context["params"]["input_aux_stac"],
"output_isofit_collection_id": context["params"]["output_isofit_collection_id"],
"output_resample_collection_id": context["params"]["output_resample_collection_id"],
}
ti.xcom_push(key="cwl_args", value=json.dumps(task_dict))


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


# Task that executes the specific CWL workflow with the previous arguments
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="SBG_L1_to_L2_End_To_End_CWL",
on_finish_action="delete_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
task_id="SBG_L1_to_L2_End_To_End_CWL",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-l1-to-l2-e2e-cwl-pod-" + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
dag=dag,
)

setup_task >> cwl_task
27 changes: 20 additions & 7 deletions airflow/docker/cwl/docker_cwl_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
#!/bin/sh
# Script to execute a CWL workflow that includes Docker containers
# The Docker engine is started before the CWL execution, and stopped afterwards.
# $1: the CWL workflow URL (example: https://raw.githubusercontent.com/unity-sds/unity-sps-prototype/cwl-docker/cwl/cwl_workflows/echo_from_docker.cwl)
# $2: the CWL job parameters as a JSON formatted string (example: { name: John Doe })
# $1: the CWL workflow URL
# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.cwl)
# $2: a) the CWL job parameters as a JSON formatted string
# (example: { "name": "John Doe" })
# OR b) The URL of a YAML or JSON file containing the job parameters
# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.dev.yml)
# $3: optional output directory, defaults to the current directory
# Note: $output_dir must be accessible by the Docker container that executes this script

set -ex
set -e
cwl_workflow=$1
job_args=$2
output_dir=${3:-.}
echo "Executing CWL workflow: $cwl_workflow with json arguments: $job_args and output directory: $output_dir"
echo "$job_args" > /tmp/job_args.json
cat /tmp/job_args.json

# switch between the 2 cases a) and b) for job_args
if [ "$job_args" = "${job_args#{}" ]
then
# job_args does NOT start with '{'
echo "Using job arguments from URL: $job_args"
else
echo "$job_args" > /tmp/job_args.json
echo "Using job arguments from JSON string:" && cat /tmp/job_args.json
job_args="/tmp/job_args.json"
fi
echo "Executing the CWL workflow: $cwl_workflow with json arguments: $job_args and output directory: $output_dir"

# create output directory if it doesn't exist
mkdir -p "$output_dir"
Expand All @@ -29,7 +42,7 @@ done

# Execute CWL workflow
. /usr/share/cwl/venv/bin/activate
cwl-runner --outdir "$output_dir" "$cwl_workflow" /tmp/job_args.json
cwl-runner --outdir "$output_dir" "$cwl_workflow" "$job_args"
deactivate

# Stop Docker engine
Expand Down

0 comments on commit b733337

Please sign in to comment.