Skip to content

Commit

Permalink
update pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pymonger committed Feb 20, 2025
1 parent 897063d commit 5de6f7f
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 56 deletions.
28 changes: 7 additions & 21 deletions airflow/dags/edrgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
"s3://unity-gmanipon-ads-deployment-dev/srl/edrgen/static/srl/current/products/dp_980_v0.xml",
type="string",
),
"output_url": Param(
"s3://unity-gmanipon-ads-deployment-dev/output", type="string"
),
"output_url": Param("s3://unity-gmanipon-ads-deployment-dev/output", type="string"),
},
) as dag:

Expand Down Expand Up @@ -72,12 +70,8 @@ def prep(params: dict):
preserve_file_name=True,
use_autogenerated_subdir=False,
)
edrgen_args[k] = os.path.join(
f"/{i}", os.path.basename(parsed_url.path)
)
res = subprocess.run(
["ls", "-l", dag_run_subdir], capture_output=True, text=True
)
edrgen_args[k] = os.path.join(f"/{i}", os.path.basename(parsed_url.path))
res = subprocess.run(["ls", "-l", dag_run_subdir], capture_output=True, text=True)
print(res.stdout)
print(res.stderr)
finally:
Expand Down Expand Up @@ -117,22 +111,16 @@ def prep(params: dict):
# container_security_context={"privileged": True},
volume_mounts=[
k8s.V1VolumeMount(
name="workers-volume",
mount_path="/stage-in",
sub_path="{{ dag_run.run_id }}/stage-in",
name="workers-volume", mount_path="/stage-in", sub_path="{{ dag_run.run_id }}/stage-in"
),
k8s.V1VolumeMount(
name="workers-volume",
mount_path="/stage-out",
sub_path="{{ dag_run.run_id }}/stage-out",
name="workers-volume", mount_path="/stage-out", sub_path="{{ dag_run.run_id }}/stage-out"
),
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="airflow-kpo"
),
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
)
Expand All @@ -149,9 +137,7 @@ def post(params: dict):
output_urls = []
for i in glob(os.path.join(stage_out_dir, "*.VIC")):
dest_key = os.path.join(prefix, os.path.basename(i))
s3_hook.load_file(
bucket_name=bucket, key=dest_key, filename=i, replace=True
)
s3_hook.load_file(bucket_name=bucket, key=dest_key, filename=i, replace=True)
print(f"Copying {i} to {dest_key}.")
output_urls.append(f"s3://{bucket}/{dest_key}")
return output_urls
Expand Down
11 changes: 3 additions & 8 deletions airflow/dags/eval_srl_edrgen_readiness.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.python import (
PythonOperator,
get_current_context,
ShortCircuitOperator,
)
from airflow.operators.python import PythonOperator, get_current_context, ShortCircuitOperator
from airflow.models.param import Param
from airflow.exceptions import AirflowFailException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
Expand Down Expand Up @@ -66,9 +62,7 @@ def evaluate_edrgen(params: dict):
keys["emd"] = (bucket, key)
keys["dat"] = (bucket, f"{path}/{match.groupdict()['id']}.dat")
else:
raise AirflowFailException(
f"Extension {match.groupdict()['ext']} not recognized."
)
raise AirflowFailException(f"Extension {match.groupdict()['ext']} not recognized.")
keys["fsw"] = (
"unity-gmanipon-ads-deployment-dev",
f"srl/edrgen/static/srl/current/products/dp_{match.groupdict()['apid'][1:]}_v0.xml",
Expand Down Expand Up @@ -114,6 +108,7 @@ def edrgen_evaluation_successful():
"dat_url": "{{ ti.xcom_pull(task_ids='evaluate_edrgen')['dat_url'] }}",
"emd_url": "{{ ti.xcom_pull(task_ids='evaluate_edrgen')['emd_url'] }}",
"fsw_url": "{{ ti.xcom_pull(task_ids='evaluate_edrgen')['fsw_url'] }}",
"output_url": "s3://gmanipon-dev-sps-isl/STACAM/VIC",
},
)

Expand Down
95 changes: 95 additions & 0 deletions airflow/dags/eval_srl_vic2png.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import re
import subprocess
from datetime import datetime
from urllib.parse import urlparse

from kubernetes.client import models as k8s

from airflow import DAG
from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.python import PythonOperator, get_current_context, ShortCircuitOperator
from airflow.models.param import Param
from airflow.exceptions import AirflowFailException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


default_args = {"owner": "unity-sps", "start_date": datetime.utcfromtimestamp(0)}

FNAME_RE = re.compile(
r"^(?P<id>(?P<instrument>SA|SB|FL|FR)(?P<color>[A-GJ-MORTX-Z_])(?P<specFlag>[A-Z_])(?P<primaryTime>\d{4})(?P<spacer0>[A-Z_])(?P<secondaryTime>\d{10})(?P<spacer1>_)(?P<tertiaryTime>\d{3})(?P<prodType>[A-Z_]{3})(?P<geometry>[NT])(?P<seqId>[A-Z]{3}[A-Z_]\d{5})(?P<downsample>[0-3_])(?P<compression>[A-Z0-9]{2})(?P<producer>[A-Z_])(?P<version>[A-Z0-9_]{2}))(?P<extension>\.VIC)$"
)


with DAG(
dag_id="eval_srl_vic2png",
default_args=default_args,
schedule=None,
tags=["eval_srl_vic2png"],
params={
"payload": Param(
"s3://unity-gmanipon-ads-deployment-dev/output/SAM_0000_0734432789_658ECMNAUT_040960LUJ01.VIC",
type="string",
)
},
) as dag:

@task
def evaluate_vic2png(params: dict):
context = get_current_context()
dag_run_id = context["dag_run"].run_id
s3_hook = S3Hook()

# parse triggering payload
payload = params["payload"]
bucket, key = s3_hook.parse_s3_url(payload)
fname = os.path.basename(key)
path = os.path.dirname(key)

# ensure matches filename convention and parse filename components
match = FNAME_RE.search(fname)
if not match:
raise AirflowFailException("Filename {fname} not recognized.")

# build expected file prefixes (TODO: fix hardcoding)
keys = {"vic": (bucket, key)}

# default vic2png DAG parameters
vic2png_args = {"success": True, "vic_url": None, "id": match.groupdict()["id"]}

# check if all inputs exist
for k, v in keys.items():
exists = s3_hook.check_for_key(v[1], bucket_name=v[0])
vic2png_args["success"] &= exists
if exists:
vic2png_args[f"{k}_url"] = f"s3://{v[0]}/{v[1]}"

# return params and evaluation result
return vic2png_args

evaluate_vic2png_task = evaluate_vic2png()

@task.short_circuit()
def vic2png_evaluation_successful():
context = get_current_context()
print(f"{context['ti'].xcom_pull(task_ids='evaluate_vic2png')}")
return context["ti"].xcom_pull(task_ids="evaluate_vic2png")["success"]

vic2png_evaluation_successful_task = vic2png_evaluation_successful()

trigger_vic2png_task = TriggerDagRunOperator(
task_id="trigger_vic2png",
trigger_dag_id="vic2png",
# uncomment the next line if we want to dedup dagRuns for a particular ID
trigger_run_id="{{ ti.xcom_pull(task_ids='evaluate_vic2png')['id'] }}",
trigger_rule=TriggerRule.ALL_SUCCESS,
skip_when_already_exists=True,
conf={
"vic_url": "{{ ti.xcom_pull(task_ids='evaluate_vic2png')['vic_url'] }}",
"output_url": "s3://gmanipon-dev-sps-isl/STACAM/PNG",
},
)

evaluate_vic2png_task >> vic2png_evaluation_successful_task >> trigger_vic2png_task
34 changes: 8 additions & 26 deletions airflow/dags/vic2png.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
"s3://unity-gmanipon-ads-deployment-dev/output/SAM_0000_0734432789_658ECMNAUT_040960LUJ01.VIC",
type="string",
),
"output_url": Param(
"s3://unity-gmanipon-ads-deployment-dev/output", type="string"
),
"output_url": Param("s3://unity-gmanipon-ads-deployment-dev/output", type="string"),
},
) as dag:

Expand Down Expand Up @@ -64,21 +62,13 @@ def prep(params: dict):
preserve_file_name=True,
use_autogenerated_subdir=False,
)
vic2png_args[k] = os.path.join(
f"/{i}", os.path.basename(parsed_url.path)
)
res = subprocess.run(
["ls", "-l", dag_run_subdir], capture_output=True, text=True
)
vic2png_args[k] = os.path.join(f"/{i}", os.path.basename(parsed_url.path))
res = subprocess.run(["ls", "-l", dag_run_subdir], capture_output=True, text=True)
print(res.stdout)
print(res.stderr)
finally:
os.umask(original_umask)
cli_args = [
vic2png_args["vic_url"],
"--out",
"/stage-out",
]
cli_args = [vic2png_args["vic_url"], "--out", "/stage-out"]
res = subprocess.run(["find", dag_run_dir], capture_output=True, text=True)
print(res.stdout)
print(res.stderr)
Expand Down Expand Up @@ -107,22 +97,16 @@ def prep(params: dict):
# container_security_context={"privileged": True},
volume_mounts=[
k8s.V1VolumeMount(
name="workers-volume",
mount_path="/stage-in",
sub_path="{{ dag_run.run_id }}/stage-in",
name="workers-volume", mount_path="/stage-in", sub_path="{{ dag_run.run_id }}/stage-in"
),
k8s.V1VolumeMount(
name="workers-volume",
mount_path="/stage-out",
sub_path="{{ dag_run.run_id }}/stage-out",
name="workers-volume", mount_path="/stage-out", sub_path="{{ dag_run.run_id }}/stage-out"
),
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="airflow-kpo"
),
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
)
Expand All @@ -139,9 +123,7 @@ def post(params: dict):
output_urls = []
for i in glob(os.path.join(stage_out_dir, "*.png")):
dest_key = os.path.join(prefix, os.path.basename(i))
s3_hook.load_file(
bucket_name=bucket, key=dest_key, filename=i, replace=True
)
s3_hook.load_file(bucket_name=bucket, key=dest_key, filename=i, replace=True)
print(f"Copying {i} to {dest_key}.")
output_urls.append(f"s3://{bucket}/{dest_key}")
return output_urls
Expand Down
11 changes: 10 additions & 1 deletion terraform-unity/.terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5de6f7f

Please sign in to comment.