From 7cf83892bb1f76c2af1fcb39574c64cb31cc8821 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 5 Mar 2024 15:48:01 -0800 Subject: [PATCH 1/7] Volume mount for dag --- airflow/dags/sbg_preprocess_cwl_dag.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index b9012954..82f62afe 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -86,9 +86,18 @@ def setup(ti=None, **context): task_id="SBG_Preprocess_CWL", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))), pod_template_file=POD_TEMPLATE_FILE, - arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"], + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", "/scratch"], # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, dag=dag, + volumes=[ + { + "name": "kubernetes-pod-operator-ebs-volume", + "persistentVolumeClaim": {"claimName": "kubernetes-pod-operator-ebs-pvc"}, + } + ], + volume_mounts=[ + {"name": "kubernetes-pod-operator-ebs-volume", "mountPath": "/scratch", "readOnly": False} + ], ) setup_task >> cwl_task From abc0ea914005133f19d1759edacf7ee4e46ad5aa Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 6 Mar 2024 10:08:53 -0800 Subject: [PATCH 2/7] Volume mount testing in dag --- airflow/dags/hello_world.py | 66 ++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/airflow/dags/hello_world.py b/airflow/dags/hello_world.py index 50d0a080..b8971e86 100644 --- a/airflow/dags/hello_world.py +++ b/airflow/dags/hello_world.py @@ -1,11 +1,4 @@ -""" -# DAG Name: Hello World - -# Purpose - -# Usage -""" # noqa: E501 - +import os import time from datetime import datetime @@ -24,13 +17,62 @@ def hello_world(): time.sleep(30) +def write_to_shared_data(): + file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + with open(file_path, "w") as f: + f.write("This is a test file written at " + str(datetime.now()) + "\n") + print(f"Successfully written to {file_path}") + + +def read_from_shared_data(): + file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + try: + with open(file_path, "r") as f: + contents = f.read() + print(f"File contents:\n{contents}") + except FileNotFoundError: + print("File not found. Make sure the file path is correct.") + + +def delete_shared_data_file(): + file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + try: + os.remove(file_path) + print(f"Successfully deleted {file_path}") + except FileNotFoundError: + print("File not found. Make sure the file path is correct.") + + with DAG( - dag_id="hello_world", - doc_md=__doc__, + dag_id="full_workflow", default_args=default_args, schedule=None, is_paused_upon_creation=False, tags=["test"], ) as dag: - hello_world_task = PythonOperator(task_id="hello_world", python_callable=hello_world) - hello_world_task + hello_world_task = PythonOperator( + task_id="hello_world", + python_callable=hello_world, + ) + + write_to_shared_data_task = PythonOperator( + task_id="write_to_shared_data", + python_callable=write_to_shared_data, + ) + + read_from_shared_data_task = PythonOperator( + task_id="read_from_shared_data", + python_callable=read_from_shared_data, + ) + + delete_shared_data_file_task = PythonOperator( + task_id="delete_shared_data_file", + python_callable=delete_shared_data_file, + ) + + ( + hello_world_task + >> write_to_shared_data_task + >> read_from_shared_data_task + >> delete_shared_data_file_task + ) From 4d8b04ea59c21cde42b2a6f7bfdfb1d83c52aab0 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 6 Mar 2024 11:03:55 -0800 Subject: [PATCH 3/7] Initial debug pvc mount in dag --- airflow/dags/hello_world.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/airflow/dags/hello_world.py b/airflow/dags/hello_world.py index b8971e86..43fd79ab 100644 --- a/airflow/dags/hello_world.py +++ b/airflow/dags/hello_world.py @@ -1,3 +1,11 @@ +""" +# DAG Name: Hello World + +# Purpose + +# Usage +""" # noqa: E501 + import os import time from datetime import datetime @@ -18,14 +26,14 @@ def hello_world(): def write_to_shared_data(): - file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + file_path = "/shared-task-data/test_file.txt" # Adjust the path as necessary with open(file_path, "w") as f: f.write("This is a test file written at " + str(datetime.now()) + "\n") print(f"Successfully written to {file_path}") def read_from_shared_data(): - file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + file_path = "/shared-task-data/test_file.txt" # Adjust the path as necessary try: with open(file_path, "r") as f: contents = f.read() @@ -35,7 +43,7 @@ def read_from_shared_data(): def delete_shared_data_file(): - file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + file_path = "/shared-task-data/test_file.txt" # Adjust the path as necessary try: os.remove(file_path) print(f"Successfully deleted {file_path}") From 7b9951087af0730fc90da80423a0f9f11a2b84a3 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 6 Mar 2024 14:52:22 -0800 Subject: [PATCH 4/7] Support of pvc mounting --- airflow/dags/hello_world.py | 2 +- airflow/dags/sbg_preprocess_cwl_dag.py | 43 +++++-- airflow/helm/values.tmpl.yaml | 8 ++ .../terraform-unity-sps-airflow/README.md | 9 ++ .../terraform-unity-sps-airflow/data.tf | 4 + .../terraform-unity-sps-airflow/main.tf | 106 ++++++++++++++++++ 6 files changed, 162 insertions(+), 10 deletions(-) diff --git a/airflow/dags/hello_world.py b/airflow/dags/hello_world.py index 43fd79ab..c8cb7a3a 100644 --- a/airflow/dags/hello_world.py +++ b/airflow/dags/hello_world.py @@ -52,7 +52,7 @@ def delete_shared_data_file(): with DAG( - dag_id="full_workflow", + dag_id="hello_world", default_args=default_args, schedule=None, is_paused_upon_creation=False, diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index 82f62afe..c8132c88 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -1,11 +1,13 @@ # DAG for executing the SBG Preprocess Workflow # See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl import json +import os +import shutil import uuid from datetime import datetime from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.operators.python import PythonOperator, get_current_context from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s @@ -57,6 +59,11 @@ # Task that serializes the job arguments into a JSON string def setup(ti=None, **context): + context = get_current_context() + dag_run_id = context["dag_run"].run_id + local_dir = os.path.dirname(f"/shared-task-data/{dag_run_id}") + os.makedirs(local_dir, exist_ok=True) + task_dict = { "input_processing_labels": ["label1", "label2"], "input_cmr_stac": context["params"]["input_cmr_stac"], @@ -89,15 +96,33 @@ def setup(ti=None, **context): arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", "/scratch"], # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, dag=dag, - volumes=[ - { - "name": "kubernetes-pod-operator-ebs-volume", - "persistentVolumeClaim": {"claimName": "kubernetes-pod-operator-ebs-pvc"}, - } - ], volume_mounts=[ - {"name": "kubernetes-pod-operator-ebs-volume", "mountPath": "/scratch", "readOnly": False} + k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}"), ], + volumes=[ + k8s.V1Volume( + name="workers-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="kpo-efs"), + ) + ], +) + + +def cleanup(**context): + dag_run_id = context["dag_run"].run_id + local_dir = f"/shared-task-data/{dag_run_id}" + if os.path.exists(local_dir): + shutil.rmtree(local_dir) + print(f"Deleted directory: {local_dir}") + else: + print(f"Directory does not exist, no need to delete: {local_dir}") + + +cleanup_task = PythonOperator( + task_id="Cleanup", + python_callable=cleanup, + dag=dag, ) -setup_task >> cwl_task + +setup_task >> cwl_task >> cleanup_task diff --git a/airflow/helm/values.tmpl.yaml b/airflow/helm/values.tmpl.yaml index 52ce8abb..d17831e8 100644 --- a/airflow/helm/values.tmpl.yaml +++ b/airflow/helm/values.tmpl.yaml @@ -88,6 +88,14 @@ workers: serviceAccount: annotations: eks.amazonaws.com/role-arn: "${airflow_worker_role_arn}" + extraVolumes: + - name: workers-volume + persistentVolumeClaim: + claimName: ${workers_pvc_name} + extraVolumeMounts: + - name: workers-volume + mountPath: /shared-task-data + readOnly: false data: metadataSecretName: ${metadata_secret_name} diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/README.md b/terraform-unity/modules/terraform-unity-sps-airflow/README.md index 38a59098..f6e17228 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/README.md +++ b/terraform-unity/modules/terraform-unity-sps-airflow/README.md @@ -32,13 +32,18 @@ No modules. |------|------| | [aws_db_instance.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_instance) | resource | | [aws_db_subnet_group.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_subnet_group) | resource | +| [aws_efs_access_point.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_access_point) | resource | +| [aws_efs_file_system.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_file_system) | resource | +| [aws_efs_mount_target.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_mount_target) | resource | | [aws_iam_policy.airflow_worker_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource | | [aws_iam_role.airflow_worker_role](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource | | [aws_iam_role_policy_attachment.airflow_worker_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | | [aws_s3_bucket.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | | [aws_secretsmanager_secret.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret) | resource | | [aws_secretsmanager_secret_version.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret_version) | resource | +| [aws_security_group.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource | | [aws_security_group.rds_sg](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource | +| [aws_security_group_rule.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [aws_security_group_rule.eks_egress_to_rds](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [aws_security_group_rule.rds_ingress_from_eks](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [helm_release.airflow](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource | @@ -48,11 +53,14 @@ No modules. | [kubernetes_ingress_v1.ogc_processes_api_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/ingress_v1) | resource | | [kubernetes_namespace.airflow](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/namespace) | resource | | [kubernetes_namespace.keda](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/namespace) | resource | +| [kubernetes_persistent_volume.efs_pv](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/persistent_volume) | resource | +| [kubernetes_persistent_volume_claim.efs_pvc](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/persistent_volume_claim) | resource | | [kubernetes_role.airflow_pod_creator](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/role) | resource | | [kubernetes_role_binding.airflow_pod_creator_binding](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/role_binding) | resource | | [kubernetes_secret.airflow_metadata](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource | | [kubernetes_secret.airflow_webserver](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource | | [kubernetes_service.ogc_processes_api](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/service) | resource | +| [kubernetes_storage_class.nfs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource | | [null_resource.remove_finalizers](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource | | [random_id.airflow_webserver_secret](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | | [random_id.counter](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | @@ -62,6 +70,7 @@ No modules. | [aws_eks_cluster_auth.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster_auth) | data source | | [aws_security_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/security_group) | data source | | [aws_ssm_parameter.subnet_ids](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/ssm_parameter) | data source | +| [aws_vpc.cluster_vpc](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/vpc) | data source | | [kubernetes_ingress_v1.airflow_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source | | [kubernetes_ingress_v1.ogc_processes_api_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source | diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/data.tf b/terraform-unity/modules/terraform-unity-sps-airflow/data.tf index 7fd5c83f..f15f24fe 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/data.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/data.tf @@ -8,6 +8,10 @@ data "aws_eks_cluster_auth" "cluster" { name = var.eks_cluster_name } +data "aws_vpc" "cluster_vpc" { + id = data.aws_eks_cluster.cluster.vpc_config[0].vpc_id +} + data "aws_ssm_parameter" "subnet_ids" { name = "/unity/cs/account/network/subnet_list" } diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index 4ed52485..77ee2ea4 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -274,6 +274,111 @@ resource "aws_iam_role_policy_attachment" "airflow_worker_policy_attachment" { policy_arn = aws_iam_policy.airflow_worker_policy.arn } +resource "aws_efs_file_system" "airflow_kpo" { + creation_token = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfs-${local.counter}" + + tags = merge(local.common_tags, { + Name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfs-${local.counter}" + Component = "airflow" + Stack = "airflow" + }) +} + +resource "aws_security_group" "airflow_kpo_efs" { + name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfsSg-${local.counter}" + description = "Security group for the EFS used in Airflow Kubernetes Pod Operators" + vpc_id = data.aws_eks_cluster.cluster.vpc_config[0].vpc_id + + tags = merge(local.common_tags, { + Name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfsSg-${local.counter}" + Component = "airflow" + Stack = "airflow" + }) +} + +resource "aws_security_group_rule" "airflow_kpo_efs" { + type = "ingress" + from_port = 2049 # NFS port + to_port = 2049 + protocol = "tcp" + security_group_id = aws_security_group.airflow_kpo_efs.id + cidr_blocks = [data.aws_vpc.cluster_vpc.cidr_block] # VPC CIDR to allow entire VPC. Adjust as necessary. +} + +resource "aws_efs_mount_target" "airflow_kpo" { + for_each = nonsensitive(toset(jsondecode(data.aws_ssm_parameter.subnet_ids.value)["private"])) + file_system_id = aws_efs_file_system.airflow_kpo.id + subnet_id = each.value + security_groups = [aws_security_group.airflow_kpo_efs.id] +} + +resource "aws_efs_access_point" "airflow_kpo" { + file_system_id = aws_efs_file_system.airflow_kpo.id + posix_user { + gid = 0 + uid = 50000 + } + root_directory { + path = "/efs" + creation_info { + owner_gid = 0 + owner_uid = 50000 + permissions = "0755" + } + } + tags = merge(local.common_tags, { + Name = "${var.project}-${var.venue}-${var.service_area}-AirflowKpoEfsAp-${local.counter}" + Component = "airflow" + Stack = "airflow" + }) +} + +resource "kubernetes_storage_class" "nfs" { + metadata { + name = "filestore" + } + reclaim_policy = "Retain" + storage_provisioner = "efs.csi.aws.com" +} + +resource "kubernetes_persistent_volume" "efs_pv" { + metadata { + name = "kpo-efs" + } + + spec { + capacity = { + storage = "5Gi" + } + access_modes = ["ReadWriteMany"] + persistent_volume_reclaim_policy = "Retain" + persistent_volume_source { + csi { + driver = "efs.csi.aws.com" + volume_handle = "${aws_efs_file_system.airflow_kpo.id}::${aws_efs_access_point.airflow_kpo.id}" + } + } + storage_class_name = kubernetes_storage_class.nfs.metadata[0].name + } +} + +resource "kubernetes_persistent_volume_claim" "efs_pvc" { + metadata { + name = "kpo-efs" + namespace = kubernetes_namespace.airflow.metadata[0].name + } + spec { + access_modes = ["ReadWriteMany"] + resources { + requests = { + storage = "5Gi" + } + } + volume_name = "kpo-efs" + storage_class_name = kubernetes_storage_class.nfs.metadata[0].name + } +} + resource "helm_release" "airflow" { name = "airflow" repository = var.helm_charts.airflow.repository @@ -289,6 +394,7 @@ resource "helm_release" "airflow" { webserver_secret_name = "airflow-webserver-secret" airflow_logs_s3_location = "s3://${aws_s3_bucket.airflow_logs.id}" airflow_worker_role_arn = aws_iam_role.airflow_worker_role.arn + workers_pvc_name = kubernetes_persistent_volume_claim.efs_pvc.metadata[0].name }) ] set_sensitive { From faa295cb1051373102166e97c852355c2bda1ad0 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 6 Mar 2024 15:04:52 -0800 Subject: [PATCH 5/7] Debug volume mount in dag --- airflow/dags/sbg_preprocess_cwl_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py index c8132c88..a4549c9c 100644 --- a/airflow/dags/sbg_preprocess_cwl_dag.py +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -97,7 +97,7 @@ def setup(ti=None, **context): # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, dag=dag, volume_mounts=[ - k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}"), + k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}") ], volumes=[ k8s.V1Volume( From b1662cb815102985746e5fbad5a6a3c9f3b236a1 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 6 Mar 2024 15:48:52 -0800 Subject: [PATCH 6/7] Rename tf resource --- .../modules/terraform-unity-sps-airflow/README.md | 2 +- .../modules/terraform-unity-sps-airflow/main.tf | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/README.md b/terraform-unity/modules/terraform-unity-sps-airflow/README.md index f6e17228..086c0bfe 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/README.md +++ b/terraform-unity/modules/terraform-unity-sps-airflow/README.md @@ -60,7 +60,7 @@ No modules. | [kubernetes_secret.airflow_metadata](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource | | [kubernetes_secret.airflow_webserver](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource | | [kubernetes_service.ogc_processes_api](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/service) | resource | -| [kubernetes_storage_class.nfs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource | +| [kubernetes_storage_class.efs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource | | [null_resource.remove_finalizers](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource | | [random_id.airflow_webserver_secret](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | | [random_id.counter](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index 77ee2ea4..7accb3da 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -298,7 +298,7 @@ resource "aws_security_group" "airflow_kpo_efs" { resource "aws_security_group_rule" "airflow_kpo_efs" { type = "ingress" - from_port = 2049 # NFS port + from_port = 2049 to_port = 2049 protocol = "tcp" security_group_id = aws_security_group.airflow_kpo_efs.id @@ -333,7 +333,8 @@ resource "aws_efs_access_point" "airflow_kpo" { }) } -resource "kubernetes_storage_class" "nfs" { +# https://github.com/hashicorp/terraform-provider-kubernetes/issues/864 +resource "kubernetes_storage_class" "efs" { metadata { name = "filestore" } @@ -358,7 +359,7 @@ resource "kubernetes_persistent_volume" "efs_pv" { volume_handle = "${aws_efs_file_system.airflow_kpo.id}::${aws_efs_access_point.airflow_kpo.id}" } } - storage_class_name = kubernetes_storage_class.nfs.metadata[0].name + storage_class_name = kubernetes_storage_class.efs.metadata[0].name } } @@ -375,7 +376,7 @@ resource "kubernetes_persistent_volume_claim" "efs_pvc" { } } volume_name = "kpo-efs" - storage_class_name = kubernetes_storage_class.nfs.metadata[0].name + storage_class_name = kubernetes_storage_class.efs.metadata[0].name } } From fbbfd95a717874ed8037eb06219118b74185b9b4 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 7 Mar 2024 08:44:39 -0700 Subject: [PATCH 7/7] [CHANGE] Using /scratch as working directory, not output directory. --- airflow/docker/cwl/docker_cwl_entrypoint.sh | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/airflow/docker/cwl/docker_cwl_entrypoint.sh b/airflow/docker/cwl/docker_cwl_entrypoint.sh index f3f1b01d..3beac4a2 100755 --- a/airflow/docker/cwl/docker_cwl_entrypoint.sh +++ b/airflow/docker/cwl/docker_cwl_entrypoint.sh @@ -7,13 +7,13 @@ # (example: { "name": "John Doe" }) # OR b) The URL of a YAML or JSON file containing the job parameters # (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.dev.yml) -# $3: optional output directory, defaults to the current directory -# Note: $output_dir must be accessible by the Docker container that executes this script +# $3: optional working directory, defaults to the current directory +# Note: The working must be accessible by the Docker container that executes this script set -e cwl_workflow=$1 job_args=$2 -output_dir=${3:-.} +work_dir=${3:-.} # switch between the 2 cases a) and b) for job_args if [ "$job_args" = "${job_args#{}" ] @@ -25,10 +25,11 @@ else echo "Using job arguments from JSON string:" && cat /tmp/job_args.json job_args="/tmp/job_args.json" fi -echo "Executing the CWL workflow: $cwl_workflow with json arguments: $job_args and output directory: $output_dir" +echo "Executing the CWL workflow: $cwl_workflow with json arguments: $job_args and working directory: $work_dir" -# create output directory if it doesn't exist -mkdir -p "$output_dir" +# create working directory if it doesn't exist +mkdir -p "$work_dir" +cd $work_dir # Start Docker engine dockerd &> dockerd-logfile & @@ -42,7 +43,7 @@ done # Execute CWL workflow . /usr/share/cwl/venv/bin/activate -cwl-runner --outdir "$output_dir" "$cwl_workflow" "$job_args" +cwl-runner "$cwl_workflow" "$job_args" deactivate # Stop Docker engine