Skip to content

Commit c20f275

Browse files
authored
Add batched_update dag (#2331)
* Initial pass at batched update DAG * Refactor into separate files * Allow for easier testing by passing through postgres_conn_id * Pull SQL test utils out into shared file * Test batched update DAG * Clean up variable names, only notify Slack when not dry_run * Reorder params * Undo testing value * Remove TODOs * Generate dag documentation * Add options for resuming without recreating temp table * Respect dry_run in expected_count task * Update dag docs * Update test * Handle explicitly passed None batch_start * Update comment, variable
1 parent 0118f50 commit c20f275

File tree

7 files changed

+1033
-179
lines changed

7 files changed

+1033
-179
lines changed

catalog/DAGs.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ The following are DAGs grouped by their primary tag:
4242

4343
| DAG ID | Schedule Interval |
4444
| --------------------------------------------------------------------------------- | ----------------- |
45+
| [`batched_update`](#batched_update) | `None` |
4546
| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` |
4647
| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` |
4748
| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` |
@@ -112,6 +113,7 @@ The following is documentation associated with each DAG (where available):
112113
1. [`add_license_url`](#add_license_url)
113114
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
114115
1. [`audio_data_refresh`](#audio_data_refresh)
116+
1. [`batched_update`](#batched_update)
115117
1. [`check_silenced_dags`](#check_silenced_dags)
116118
1. [`create_filtered_audio_index`](#create_filtered_audio_index)
117119
1. [`create_filtered_image_index`](#create_filtered_image_index)
@@ -219,6 +221,80 @@ and related PRs:
219221
- [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353)
220222
- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453)
221223

224+
## `batched_update`
225+
226+
Batched Update DAG
227+
228+
This DAG is used to run a batched SQL update on a media table in the Catalog
229+
database. It is automatically triggered by the `popularity_refresh` DAGs to
230+
refresh popularity data using newly calculated constants, but can also be
231+
triggered manually with custom SQL operations.
232+
233+
The DAG must be run with a valid dag_run configuration specifying the SQL
234+
commands to be run. The DAG will then split the rows to be updated into batches,
235+
and report to Slack when all batches have been updated. It handles all
236+
deadlocking and timeout concerns, ensuring that the provided SQL is run without
237+
interfering with ingestion. For more information, see the implementation plan:
238+
https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html#special-considerations-avoiding-deadlocks-and-timeouts
239+
240+
By default the DAG will run as a dry_run, logging the generated SQL but not
241+
actually running it. To actually perform the update, the `dry_run` parameter
242+
must be explicitly set to `false` in the configuration.
243+
244+
Required Dagrun Configuration parameters:
245+
246+
- query_id: a string identifier which will be appended to temporary table used
247+
in the update
248+
- table_name: the name of the table to update. Must be a valid media table
249+
- select_query: a SQL `WHERE` clause used to select the rows that will be
250+
updated
251+
- update_query: the SQL `UPDATE` expression to be run on all selected rows
252+
253+
Optional params:
254+
255+
- dry_run: bool, whether to actually run the generated SQL. True by default.
256+
- batch_size: int number of records to process in each batch. By default, 10_000
257+
- update_timeout: int number of seconds to run an individual batch update before
258+
timing out. By default, 3600 (or one hour)
259+
- batch_start: int index into the temp table at which to start the update. By
260+
default, this is 0 and all rows in the temp table are updated.
261+
- resume_update: boolean indicating whether to attempt to resume an update using
262+
an existing temp table matching the `query_id`. When True, a new temp table is
263+
not created.
264+
265+
An example dag_run configuration used to set the thumbnails of all Flickr images
266+
to null would look like this:
267+
268+
```
269+
{
270+
"query_id": "my_flickr_query",
271+
"table_name": "image",
272+
"select_query": "WHERE provider='flickr'",
273+
"update_query": "SET thumbnail=null",
274+
"batch_size": 10,
275+
"dry_run": false
276+
}
277+
```
278+
279+
It is possible to resume an update from an arbitrary starting point on an
280+
existing temp table, for example if a DAG succeeds in creating the temp table
281+
but fails midway through the update. To do so, set the `resume_update` param to
282+
True and select your desired `batch_start`. For instance, if the example DAG
283+
given above failed after processing the first 50_000 records, you might run:
284+
285+
```
286+
{
287+
"query_id": "my_flickr_query",
288+
"table_name": "image",
289+
"select_query": "WHERE provider='flickr'",
290+
"update_query": "SET thumbnail=null",
291+
"batch_size": 10,
292+
"batch_start": 50000,
293+
"resume_update": true,
294+
"dry_run": false
295+
}
296+
```
297+
222298
## `check_silenced_dags`
223299

224300
### Silenced DAGs check
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import logging
2+
from datetime import timedelta
3+
4+
from airflow.decorators import task
5+
from airflow.models.abstractoperator import AbstractOperator
6+
7+
from common import slack
8+
from common.constants import POSTGRES_CONN_ID
9+
from common.sql import PostgresHook
10+
from database.batched_update import constants
11+
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _single_value(cursor):
17+
try:
18+
row = cursor.fetchone()
19+
return row[0]
20+
except Exception as e:
21+
raise ValueError("Unable to extract expected row data from cursor") from e
22+
23+
24+
@task.branch
25+
def resume_update(
26+
resume_update: bool,
27+
):
28+
"""
29+
Return True to short circuit temp table creation if this DagRun is
30+
resuming from an existing temp table.
31+
"""
32+
if resume_update:
33+
# Skip table creation and indexing
34+
return constants.GET_EXPECTED_COUNT_TASK_ID
35+
return constants.CREATE_TEMP_TABLE_TASK_ID
36+
37+
38+
@task
39+
def get_expected_update_count(query_id: str, batch_start: int | None, dry_run: bool):
40+
"""
41+
Get the number of records left to update, when resuming an update
42+
on an existing temp table.
43+
"""
44+
total_count = run_sql.function(
45+
dry_run=dry_run,
46+
sql_template=constants.SELECT_TEMP_TABLE_QUERY,
47+
query_id=query_id,
48+
handler=_single_value,
49+
)
50+
51+
if batch_start:
52+
total_count -= batch_start
53+
return max(total_count, 0)
54+
55+
56+
@task
57+
def run_sql(
58+
dry_run: bool,
59+
sql_template: str,
60+
query_id: str,
61+
log_sql: bool = True,
62+
postgres_conn_id: str = POSTGRES_CONN_ID,
63+
task: AbstractOperator = None,
64+
timeout: timedelta = None,
65+
handler: callable = constants.RETURN_ROW_COUNT,
66+
**kwargs,
67+
):
68+
query = sql_template.format(
69+
temp_table_name=constants.TEMP_TABLE_NAME.format(query_id=query_id), **kwargs
70+
)
71+
if dry_run:
72+
logger.info(
73+
"This is a dry run: no SQL will be executed. To perform the updates,"
74+
" rerun the DAG with the conf option `'dry_run': false`."
75+
)
76+
logger.info(query)
77+
return 0
78+
79+
postgres = PostgresHook(
80+
postgres_conn_id=postgres_conn_id,
81+
default_statement_timeout=(
82+
timeout if timeout else PostgresHook.get_execution_timeout(task)
83+
),
84+
log_sql=log_sql,
85+
)
86+
87+
return postgres.run(query, handler=handler)
88+
89+
90+
@task
91+
def update_batches(
92+
expected_row_count: int,
93+
batch_size: int,
94+
dry_run: bool,
95+
table_name: str,
96+
query_id: str,
97+
update_query: str,
98+
update_timeout: int,
99+
batch_start: int = 0,
100+
postgres_conn_id: str = POSTGRES_CONN_ID,
101+
task: AbstractOperator = None,
102+
**kwargs,
103+
):
104+
updated_count = 0
105+
if batch_start is None:
106+
batch_start = 0
107+
108+
while batch_start <= expected_row_count:
109+
batch_end = batch_start + batch_size
110+
111+
logger.info(f"Updating rows with id {batch_start} through {batch_end}.")
112+
count = run_sql.function(
113+
dry_run=dry_run,
114+
sql_template=constants.UPDATE_BATCH_QUERY,
115+
query_id=query_id,
116+
# Only log the query the first time, so as not to spam the logs
117+
log_sql=batch_start == 1,
118+
postgres_conn_id=postgres_conn_id,
119+
task=task,
120+
timeout=update_timeout,
121+
table_name=table_name,
122+
update_query=update_query,
123+
batch_start=batch_start,
124+
batch_end=batch_end,
125+
)
126+
127+
updated_count += count
128+
batch_start = batch_end
129+
logger.info(
130+
f"Updated {updated_count} rows. {expected_row_count - updated_count}"
131+
" remaining."
132+
)
133+
134+
return updated_count
135+
136+
137+
@task
138+
def notify_slack(text: str, dry_run: bool) -> None:
139+
if not dry_run:
140+
slack.send_message(
141+
text,
142+
username=constants.SLACK_USERNAME,
143+
icon_emoji=constants.SLACK_ICON,
144+
dag_id=constants.DAG_ID,
145+
)
146+
else:
147+
logger.info(text)

0 commit comments

Comments
 (0)