Skip to content

Commit

Permalink
Merge pull request #7 from unity-sds/259-cwl
Browse files Browse the repository at this point in the history
Add working prototype of CWL workflow integration
  • Loading branch information
drewm-swe authored Feb 26, 2024
2 parents efbc7cd + 2fc77e3 commit f4b9e01
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 4 deletions.
20 changes: 20 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Purpose

- Clear, easy-to-understand sentences outlining the purpose of the PR

## Proposed Changes

- [ADD] ...
- [CHANGE] ...
- [FIX] ...

## Issues

- Links to relevant issues
- Example: issue-XYZ

## Testing

- Provide some proof you've tested your changes
- Example: test results available at ...
- Example: tested on operating system ...
14 changes: 14 additions & 0 deletions airflow/dags/docker_cwl_pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Pod
metadata:
name: docker-cwl-pod
spec:
restartPolicy: Never
serviceAccountName: airflow-worker

containers:
- name: cwl-docker
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:development-tag2
command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
securityContext:
privileged: true
91 changes: 91 additions & 0 deletions airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# DAG for executing the SBG Preprocess Workflow
# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl
import json
import uuid
from datetime import datetime

from airflow.models.param import Param
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

from airflow import DAG

# The Kubernetes Pod that executes the CWL-Docker container
# Must use elevated privileges to start/stop the Docker engine
POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml"

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"


# Default DAG configuration
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}
CWL_WORKFLOW = (
"https://raw.githubusercontent.com/unity-sds/sbg-workflows/1.0/preprocess/sbg-preprocess-workflow.cwl"
)

dag = DAG(
dag_id="sbg-preprocess-cwl-dag",
description="SBG Preprocess Workflow as CWL",
tags=["SBG", "Unity", "SPS", "NASA", "JPL"],
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
# "input_processing_labels": Param(["label1", "label2"], type="string[]"),
"input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"),
"input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"input_crid": Param("001", type="string"),
"output_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", type="string"),
"output_data_bucket": Param("sps-dev-ds-storage", type="string"),
},
)


# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):
task_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_collection_name": context["params"]["input_cmr_collection_name"],
"input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"],
"input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"],
"input_unity_dapa_api": context["params"]["input_unity_dapa_api"],
"input_unity_dapa_client": context["params"]["input_unity_dapa_client"],
"input_crid": context["params"]["input_crid"],
"output_collection_id": context["params"]["output_collection_id"],
"output_data_bucket": context["params"]["output_data_bucket"],
}
ti.xcom_push(key="cwl_args", value=json.dumps(task_dict))


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


# Task that executes the specific CWL workflow with the previous arguments
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="SBG_Preprocess_CWL",
on_finish_action="delete_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
task_id="SBG_Preprocess_CWL",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
dag=dag,
)

setup_task >> cwl_task
3 changes: 3 additions & 0 deletions airflow/helm/values.tmpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ workers:
- type: Percent
value: 100
periodSeconds: 5
serviceAccount:
annotations:
eks.amazonaws.com/role-arn: "${airflow_worker_role_arn}"

data:
metadataSecretName: ${metadata_secret_name}
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ test = [
"requests==2.31.0",
"apache-airflow==2.8.1",
"kubernetes==29.0.0",
"boto3==1.34.46"
"boto3==1.34.46",
"apache-airflow-providers-cncf-kubernetes==8.0.0"
]
experiment = []

Expand Down
2 changes: 1 addition & 1 deletion terraform-unity/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ No resources.
|------|-------------|------|---------|:--------:|
| <a name="input_airflow_webserver_password"></a> [airflow\_webserver\_password](#input\_airflow\_webserver\_password) | value | `string` | n/a | yes |
| <a name="input_counter"></a> [counter](#input\_counter) | value | `string` | `""` | no |
| <a name="input_custom_airflow_docker_image"></a> [custom\_airflow\_docker\_image](#input\_custom\_airflow\_docker\_image) | Docker image for the customized Airflow image. | <pre>object({<br> name = string<br> tag = string<br> })</pre> | <pre>{<br> "name": "ghcr.io/unity-sds/unity-sps-prototype/sps-airflow",<br> "tag": "develop"<br>}</pre> | no |
| <a name="input_custom_airflow_docker_image"></a> [custom\_airflow\_docker\_image](#input\_custom\_airflow\_docker\_image) | Docker image for the customized Airflow image. | <pre>object({<br> name = string<br> tag = string<br> })</pre> | <pre>{<br> "name": "ghcr.io/unity-sds/unity-sps/sps-airflow",<br> "tag": "develop"<br>}</pre> | no |
| <a name="input_eks_cluster_name"></a> [eks\_cluster\_name](#input\_eks\_cluster\_name) | The name of the EKS cluster. | `string` | n/a | yes |
| <a name="input_helm_charts"></a> [helm\_charts](#input\_helm\_charts) | Settings for the required Helm charts. | <pre>map(object({<br> repository = string<br> chart = string<br> version = string<br> }))</pre> | <pre>{<br> "airflow": {<br> "chart": "airflow",<br> "repository": "https://airflow.apache.org",<br> "version": "1.11.0"<br> },<br> "keda": {<br> "chart": "keda",<br> "repository": "https://kedacore.github.io/charts",<br> "version": "v2.13.1"<br> }<br>}</pre> | no |
| <a name="input_kubeconfig_filepath"></a> [kubeconfig\_filepath](#input\_kubeconfig\_filepath) | Path to the kubeconfig file for the Kubernetes cluster | `string` | `"../k8s/kubernetes.yml"` | no |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ No modules.
| [random_password.airflow_db](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/password) | resource |
| [aws_eks_cluster.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster) | data source |
| [aws_eks_cluster_auth.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster_auth) | data source |
| [aws_eks_node_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_node_group) | data source |
| [aws_security_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/security_group) | data source |
| [aws_ssm_parameter.subnet_ids](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/ssm_parameter) | data source |
| [kubernetes_ingress_v1.airflow_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source |
Expand Down
5 changes: 5 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/data.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ data "kubernetes_ingress_v1" "airflow_ingress" {
namespace = kubernetes_namespace.airflow.metadata[0].name
}
}

data "aws_eks_node_group" "default" {
cluster_name = var.eks_cluster_name
node_group_name = "defaultGroup"
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ resource "kubernetes_secret" "airflow_webserver" {
}
}


resource "kubernetes_role" "airflow_pod_creator" {
metadata {
name = "airflow-pod-creator"
Expand Down Expand Up @@ -229,6 +228,7 @@ resource "helm_release" "airflow" {
metadata_secret_name = "airflow-metadata-secret"
webserver_secret_name = "airflow-webserver-secret"
airflow_logs_s3_location = "s3://${aws_s3_bucket.airflow_logs.id}"
airflow_worker_role_arn = data.aws_eks_node_group.default.node_role_arn
})
]
set_sensitive {
Expand Down
2 changes: 1 addition & 1 deletion terraform-unity/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ variable "custom_airflow_docker_image" {
tag = string
})
default = {
name = "ghcr.io/unity-sds/unity-sps-prototype/sps-airflow"
name = "ghcr.io/unity-sds/unity-sps/sps-airflow"
tag = "develop"
}
}

0 comments on commit f4b9e01

Please sign in to comment.