-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsample_ch_ddl.py
66 lines (53 loc) · 1.69 KB
/
sample_ch_ddl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import datetime
import pathlib
from airflow.utils.trigger_rule import TriggerRule
from airflow.decorators import dag
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators import sql
from airflow_clickhouse_plugin.operators.clickhouse_dbapi import (
ClickHouseBaseDbApiOperator,
)
class ClickHouseBranchSQLOperator(
sql.BranchSQLOperator,
ClickHouseBaseDbApiOperator,
):
"""
temporary workaround for Airflow < 2.9.4
see https://github.com/bryzgaloff/airflow-clickhouse-plugin/issues/87
"""
pass
@dag(
dag_id=pathlib.Path(__file__).stem,
schedule=None,
start_date=datetime.datetime(2024, 9, 1, 0, 0, 0),
catchup=False,
dag_display_name="Create sample_table",
tags=["sample", "clickhouse", "ddl"],
max_active_runs=1,
)
def sample_ddl_stats():
check_tbl_exists = ClickHouseBranchSQLOperator(
task_id='check_if_sample_table_exists',
sql='EXISTS sample_table',
conn_id='ch_default',
follow_task_ids_if_true='do_nothing',
follow_task_ids_if_false='create_sample_table',
)
do_nothing = EmptyOperator(task_id="do_nothing")
create_tbl = ClickHouseOperator(
task_id='create_sample_table',
sql="""
CREATE TABLE IF NOT EXISTS sample_table
(
id UInt32,
value Float64,
category Enum8('A' = 1, 'B' = 2, 'C' = 3)
) ENGINE = MergeTree() ORDER BY id;
""",
clickhouse_conn_id='ch_default',
)
check_tbl_exists >> [create_tbl, do_nothing]
my_dag = sample_ddl_stats()
if __name__ == '__main__':
my_dag.test()