Skip to content

Commit

Permalink
Merge branch '279-pod-pvc' into 264-execution-of-additional-sbg-workf…
Browse files Browse the repository at this point in the history
…lows
  • Loading branch information
LucaCinquini committed Mar 7, 2024
2 parents 8f48358 + fbbfd95 commit 77ddd34
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 13 deletions.
56 changes: 53 additions & 3 deletions airflow/dags/hello_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Usage
""" # noqa: E501

import os
import time
from datetime import datetime

Expand All @@ -24,13 +25,62 @@ def hello_world():
time.sleep(30)


def write_to_shared_data():
file_path = "/shared-task-data/test_file.txt" # Adjust the path as necessary
with open(file_path, "w") as f:
f.write("This is a test file written at " + str(datetime.now()) + "\n")
print(f"Successfully written to {file_path}")


def read_from_shared_data():
file_path = "/shared-task-data/test_file.txt" # Adjust the path as necessary
try:
with open(file_path, "r") as f:
contents = f.read()
print(f"File contents:\n{contents}")
except FileNotFoundError:
print("File not found. Make sure the file path is correct.")


def delete_shared_data_file():
file_path = "/shared-task-data/test_file.txt" # Adjust the path as necessary
try:
os.remove(file_path)
print(f"Successfully deleted {file_path}")
except FileNotFoundError:
print("File not found. Make sure the file path is correct.")


with DAG(
dag_id="hello_world",
doc_md=__doc__,
default_args=default_args,
schedule=None,
is_paused_upon_creation=False,
tags=["test"],
) as dag:
hello_world_task = PythonOperator(task_id="hello_world", python_callable=hello_world)
hello_world_task
hello_world_task = PythonOperator(
task_id="hello_world",
python_callable=hello_world,
)

write_to_shared_data_task = PythonOperator(
task_id="write_to_shared_data",
python_callable=write_to_shared_data,
)

read_from_shared_data_task = PythonOperator(
task_id="read_from_shared_data",
python_callable=read_from_shared_data,
)

delete_shared_data_file_task = PythonOperator(
task_id="delete_shared_data_file",
python_callable=delete_shared_data_file,
)

(
hello_world_task
>> write_to_shared_data_task
>> read_from_shared_data_task
>> delete_shared_data_file_task
)
40 changes: 37 additions & 3 deletions airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# DAG for executing the SBG Preprocess Workflow
# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl
import json
import os
import shutil
import uuid
from datetime import datetime

from airflow.models.param import Param
from airflow.operators.python import PythonOperator
from airflow.operators.python import PythonOperator, get_current_context
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

Expand Down Expand Up @@ -57,6 +59,11 @@

# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):
context = get_current_context()
dag_run_id = context["dag_run"].run_id
local_dir = os.path.dirname(f"/shared-task-data/{dag_run_id}")
os.makedirs(local_dir, exist_ok=True)

task_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_stac": context["params"]["input_cmr_stac"],
Expand Down Expand Up @@ -86,9 +93,36 @@ def setup(ti=None, **context):
task_id="SBG_Preprocess_CWL",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", "/scratch"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
dag=dag,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="kpo-efs"),
)
],
)


def cleanup(**context):
dag_run_id = context["dag_run"].run_id
local_dir = f"/shared-task-data/{dag_run_id}"
if os.path.exists(local_dir):
shutil.rmtree(local_dir)
print(f"Deleted directory: {local_dir}")
else:
print(f"Directory does not exist, no need to delete: {local_dir}")


cleanup_task = PythonOperator(
task_id="Cleanup",
python_callable=cleanup,
dag=dag,
)

setup_task >> cwl_task

setup_task >> cwl_task >> cleanup_task
15 changes: 8 additions & 7 deletions airflow/docker/cwl/docker_cwl_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
# (example: { "name": "John Doe" })
# OR b) The URL of a YAML or JSON file containing the job parameters
# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.dev.yml)
# $3: optional output directory, defaults to the current directory
# Note: $output_dir must be accessible by the Docker container that executes this script
# $3: optional working directory, defaults to the current directory
# Note: The working must be accessible by the Docker container that executes this script

set -e
cwl_workflow=$1
job_args=$2
output_dir=${3:-.}
work_dir=${3:-.}

# switch between the 2 cases a) and b) for job_args
if [ "$job_args" = "${job_args#{}" ]
Expand All @@ -25,10 +25,11 @@ else
echo "Using job arguments from JSON string:" && cat /tmp/job_args.json
job_args="/tmp/job_args.json"
fi
echo "Executing the CWL workflow: $cwl_workflow with json arguments: $job_args and output directory: $output_dir"
echo "Executing the CWL workflow: $cwl_workflow with json arguments: $job_args and working directory: $work_dir"

# create output directory if it doesn't exist
mkdir -p "$output_dir"
# create working directory if it doesn't exist
mkdir -p "$work_dir"
cd $work_dir

# Start Docker engine
dockerd &> dockerd-logfile &
Expand All @@ -42,7 +43,7 @@ done

# Execute CWL workflow
. /usr/share/cwl/venv/bin/activate
cwl-runner --outdir "$output_dir" "$cwl_workflow" "$job_args"
cwl-runner "$cwl_workflow" "$job_args"
deactivate

# Stop Docker engine
Expand Down
8 changes: 8 additions & 0 deletions airflow/helm/values.tmpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ workers:
serviceAccount:
annotations:
eks.amazonaws.com/role-arn: "${airflow_worker_role_arn}"
extraVolumes:
- name: workers-volume
persistentVolumeClaim:
claimName: ${workers_pvc_name}
extraVolumeMounts:
- name: workers-volume
mountPath: /shared-task-data
readOnly: false

data:
metadataSecretName: ${metadata_secret_name}
Expand Down
9 changes: 9 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ No modules.
|------|------|
| [aws_db_instance.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_instance) | resource |
| [aws_db_subnet_group.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_subnet_group) | resource |
| [aws_efs_access_point.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_access_point) | resource |
| [aws_efs_file_system.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_file_system) | resource |
| [aws_efs_mount_target.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_mount_target) | resource |
| [aws_iam_policy.airflow_worker_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource |
| [aws_iam_role.airflow_worker_role](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource |
| [aws_iam_role_policy_attachment.airflow_worker_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource |
| [aws_s3_bucket.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource |
| [aws_secretsmanager_secret.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret) | resource |
| [aws_secretsmanager_secret_version.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret_version) | resource |
| [aws_security_group.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource |
| [aws_security_group.rds_sg](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource |
| [aws_security_group_rule.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource |
| [aws_security_group_rule.eks_egress_to_rds](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource |
| [aws_security_group_rule.rds_ingress_from_eks](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource |
| [helm_release.airflow](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource |
Expand All @@ -48,11 +53,14 @@ No modules.
| [kubernetes_ingress_v1.ogc_processes_api_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/ingress_v1) | resource |
| [kubernetes_namespace.airflow](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/namespace) | resource |
| [kubernetes_namespace.keda](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/namespace) | resource |
| [kubernetes_persistent_volume.efs_pv](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/persistent_volume) | resource |
| [kubernetes_persistent_volume_claim.efs_pvc](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/persistent_volume_claim) | resource |
| [kubernetes_role.airflow_pod_creator](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/role) | resource |
| [kubernetes_role_binding.airflow_pod_creator_binding](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/role_binding) | resource |
| [kubernetes_secret.airflow_metadata](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource |
| [kubernetes_secret.airflow_webserver](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource |
| [kubernetes_service.ogc_processes_api](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/service) | resource |
| [kubernetes_storage_class.efs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource |
| [null_resource.remove_finalizers](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource |
| [random_id.airflow_webserver_secret](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource |
| [random_id.counter](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource |
Expand All @@ -62,6 +70,7 @@ No modules.
| [aws_eks_cluster_auth.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster_auth) | data source |
| [aws_security_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/security_group) | data source |
| [aws_ssm_parameter.subnet_ids](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/ssm_parameter) | data source |
| [aws_vpc.cluster_vpc](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/vpc) | data source |
| [kubernetes_ingress_v1.airflow_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source |
| [kubernetes_ingress_v1.ogc_processes_api_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source |

Expand Down
4 changes: 4 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/data.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ data "aws_eks_cluster_auth" "cluster" {
name = var.eks_cluster_name
}

data "aws_vpc" "cluster_vpc" {
id = data.aws_eks_cluster.cluster.vpc_config[0].vpc_id
}

data "aws_ssm_parameter" "subnet_ids" {
name = "/unity/cs/account/network/subnet_list"
}
Expand Down
107 changes: 107 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,112 @@ resource "aws_iam_role_policy_attachment" "airflow_worker_policy_attachment" {
policy_arn = aws_iam_policy.airflow_worker_policy.arn
}

resource "aws_efs_file_system" "airflow_kpo" {
creation_token = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfs-${local.counter}"

tags = merge(local.common_tags, {
Name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfs-${local.counter}"
Component = "airflow"
Stack = "airflow"
})
}

resource "aws_security_group" "airflow_kpo_efs" {
name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfsSg-${local.counter}"
description = "Security group for the EFS used in Airflow Kubernetes Pod Operators"
vpc_id = data.aws_eks_cluster.cluster.vpc_config[0].vpc_id

tags = merge(local.common_tags, {
Name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfsSg-${local.counter}"
Component = "airflow"
Stack = "airflow"
})
}

resource "aws_security_group_rule" "airflow_kpo_efs" {
type = "ingress"
from_port = 2049
to_port = 2049
protocol = "tcp"
security_group_id = aws_security_group.airflow_kpo_efs.id
cidr_blocks = [data.aws_vpc.cluster_vpc.cidr_block] # VPC CIDR to allow entire VPC. Adjust as necessary.
}

resource "aws_efs_mount_target" "airflow_kpo" {
for_each = nonsensitive(toset(jsondecode(data.aws_ssm_parameter.subnet_ids.value)["private"]))
file_system_id = aws_efs_file_system.airflow_kpo.id
subnet_id = each.value
security_groups = [aws_security_group.airflow_kpo_efs.id]
}

resource "aws_efs_access_point" "airflow_kpo" {
file_system_id = aws_efs_file_system.airflow_kpo.id
posix_user {
gid = 0
uid = 50000
}
root_directory {
path = "/efs"
creation_info {
owner_gid = 0
owner_uid = 50000
permissions = "0755"
}
}
tags = merge(local.common_tags, {
Name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfsAp-${local.counter}"
Component = "airflow"
Stack = "airflow"
})
}

# https://github.com/hashicorp/terraform-provider-kubernetes/issues/864
resource "kubernetes_storage_class" "efs" {
metadata {
name = "filestore"
}
reclaim_policy = "Retain"
storage_provisioner = "efs.csi.aws.com"
}

resource "kubernetes_persistent_volume" "efs_pv" {
metadata {
name = "kpo-efs"
}

spec {
capacity = {
storage = "5Gi"
}
access_modes = ["ReadWriteMany"]
persistent_volume_reclaim_policy = "Retain"
persistent_volume_source {
csi {
driver = "efs.csi.aws.com"
volume_handle = "${aws_efs_file_system.airflow_kpo.id}::${aws_efs_access_point.airflow_kpo.id}"
}
}
storage_class_name = kubernetes_storage_class.efs.metadata[0].name
}
}

resource "kubernetes_persistent_volume_claim" "efs_pvc" {
metadata {
name = "kpo-efs"
namespace = kubernetes_namespace.airflow.metadata[0].name
}
spec {
access_modes = ["ReadWriteMany"]
resources {
requests = {
storage = "5Gi"
}
}
volume_name = "kpo-efs"
storage_class_name = kubernetes_storage_class.efs.metadata[0].name
}
}

resource "helm_release" "airflow" {
name = "airflow"
repository = var.helm_charts.airflow.repository
Expand All @@ -289,6 +395,7 @@ resource "helm_release" "airflow" {
webserver_secret_name = "airflow-webserver-secret"
airflow_logs_s3_location = "s3://${aws_s3_bucket.airflow_logs.id}"
airflow_worker_role_arn = aws_iam_role.airflow_worker_role.arn
workers_pvc_name = kubernetes_persistent_volume_claim.efs_pvc.metadata[0].name
})
]
set_sensitive {
Expand Down

0 comments on commit 77ddd34

Please sign in to comment.