Skip to content

Commit 3653541

Browse files
authored
Feat!: Allow opting out of the time_column being automatically added to the partitioned_by columns (#3941)
1 parent 5ec13d2 commit 3653541

File tree

8 files changed

+79
-13
lines changed

8 files changed

+79
-13
lines changed

docs/concepts/models/model_kinds.md

+19
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,25 @@ WHERE
408408
AND event_date BETWEEN @start_ds AND @end_ds; -- `event_date` time column filter automatically added by SQLMesh
409409
```
410410

411+
### Partitioning
412+
413+
By default, we ensure that the `time_column` is part of the [partitioned_by](./overview.md#partitioned_by) property of the model so that it forms part of the partition key and allows the database engine to do partition pruning. If it is not explicitly listed in the Model definition, we will automatically add it.
414+
415+
However, this may be undesirable if you want to exclusively partition on another column or you want to partition on something like `month(time_column)` but the engine you're using doesnt support partitioning based on expressions.
416+
417+
To opt out of this behaviour, you can set `partition_by_time_column false` like so:
418+
419+
```sql linenums="1" hl_lines="5"
420+
MODEL (
421+
name db.events,
422+
kind INCREMENTAL_BY_TIME_RANGE (
423+
time_column event_date,
424+
partition_by_time_column false
425+
),
426+
partitioned_by (other_col) -- event_date will no longer be automatically added here and the partition key will just be 'other_col'
427+
);
428+
```
429+
411430
### Idempotency
412431
We recommend making sure incremental by time range model queries are [idempotent](../glossary.md#idempotency) to prevent unexpected results during data [restatement](../plans.md#restatement-plans).
413432

sqlmesh/core/model/definition.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -1180,12 +1180,18 @@ def full_depends_on(self) -> t.Set[str]:
11801180
def partitioned_by(self) -> t.List[exp.Expression]:
11811181
"""Columns to partition the model by, including the time column if it is not already included."""
11821182
if self.time_column and not self._is_time_column_in_partitioned_by:
1183-
return [
1184-
TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)(
1185-
self.time_column.column, self.columns_to_types
1186-
),
1187-
*self.partitioned_by_,
1188-
]
1183+
# This allows the user to opt out of automatic time_column injection
1184+
# by setting `partition_by_time_column false` on the model kind
1185+
if (
1186+
hasattr(self.kind, "partition_by_time_column")
1187+
and self.kind.partition_by_time_column
1188+
):
1189+
return [
1190+
TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)(
1191+
self.time_column.column, self.columns_to_types
1192+
),
1193+
*self.partitioned_by_,
1194+
]
11891195
return self.partitioned_by_
11901196

11911197
@property

sqlmesh/core/model/kind.py

+7
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ class IncrementalByTimeRangeKind(_IncrementalBy):
405405
)
406406
time_column: TimeColumn
407407
auto_restatement_intervals: t.Optional[SQLGlotPositiveInt] = None
408+
partition_by_time_column: SQLGlotBool = True
408409

409410
_time_column_validator = TimeColumn.validator()
410411

@@ -415,6 +416,11 @@ def to_expression(
415416
expressions=[
416417
*(expressions or []),
417418
self.time_column.to_property(kwargs.get("dialect") or ""),
419+
*_properties(
420+
{
421+
"partition_by_time_column": self.partition_by_time_column,
422+
}
423+
),
418424
*(
419425
[_property("auto_restatement_intervals", self.auto_restatement_intervals)]
420426
if self.auto_restatement_intervals is not None
@@ -431,6 +437,7 @@ def data_hash_values(self) -> t.List[t.Optional[str]]:
431437
def metadata_hash_values(self) -> t.List[t.Optional[str]]:
432438
return [
433439
*super().metadata_hash_values,
440+
str(self.partition_by_time_column),
434441
str(self.auto_restatement_intervals)
435442
if self.auto_restatement_intervals is not None
436443
else None,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Add 'partition_by_time_column' property to the IncrementalByTimeRange model kind
2+
(default: True to keep the original behaviour)"""
3+
4+
5+
def migrate(state_sync, **kwargs): # type: ignore
6+
pass

tests/core/test_model.py

+26
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,27 @@ def test_partitioned_by(
345345
] == partition_by_output
346346

347347

348+
def test_opt_out_of_time_column_in_partitioned_by():
349+
expressions = d.parse(
350+
"""
351+
MODEL (
352+
name db.table,
353+
dialect bigquery,
354+
partitioned_by b,
355+
kind INCREMENTAL_BY_TIME_RANGE(
356+
time_column a,
357+
partition_by_time_column false
358+
),
359+
);
360+
361+
SELECT 1::int AS a, 2::int AS b;
362+
"""
363+
)
364+
365+
model = load_sql_based_model(expressions)
366+
assert model.partitioned_by == [exp.to_column('"b"')]
367+
368+
348369
def test_no_model_statement(tmp_path: Path):
349370
# No name inference => MODEL (...) is required
350371
expressions = d.parse("SELECT 1 AS x")
@@ -1298,6 +1319,7 @@ def test_render_definition():
12981319
dialect spark,
12991320
kind INCREMENTAL_BY_TIME_RANGE (
13001321
time_column (`a`, 'yyyymmdd'),
1322+
partition_by_time_column TRUE,
13011323
forward_only FALSE,
13021324
disable_restatement FALSE,
13031325
on_destructive_change 'ERROR'
@@ -6330,6 +6352,7 @@ def test_model_kind_to_expression():
63306352
.sql()
63316353
== """INCREMENTAL_BY_TIME_RANGE (
63326354
time_column ("a", '%Y-%m-%d'),
6355+
partition_by_time_column TRUE,
63336356
forward_only FALSE,
63346357
disable_restatement FALSE,
63356358
on_destructive_change 'ERROR'
@@ -6360,6 +6383,7 @@ def test_model_kind_to_expression():
63606383
.sql()
63616384
== """INCREMENTAL_BY_TIME_RANGE (
63626385
time_column ("a", '%Y-%m-%d'),
6386+
partition_by_time_column TRUE,
63636387
batch_size 1,
63646388
batch_concurrency 2,
63656389
lookback 3,
@@ -7365,6 +7389,7 @@ def test_auto_restatement():
73657389
model.kind.to_expression().sql(pretty=True)
73667390
== """INCREMENTAL_BY_TIME_RANGE (
73677391
time_column ("a", '%Y-%m-%d'),
7392+
partition_by_time_column TRUE,
73687393
forward_only FALSE,
73697394
disable_restatement FALSE,
73707395
on_destructive_change 'ERROR',
@@ -7392,6 +7417,7 @@ def test_auto_restatement():
73927417
model.kind.to_expression().sql(pretty=True)
73937418
== """INCREMENTAL_BY_TIME_RANGE (
73947419
time_column ("a", '%Y-%m-%d'),
7420+
partition_by_time_column TRUE,
73957421
auto_restatement_intervals 1,
73967422
forward_only FALSE,
73977423
disable_restatement FALSE,

tests/core/test_snapshot.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def test_json(snapshot: Snapshot):
130130
"batch_size": 30,
131131
"forward_only": False,
132132
"on_destructive_change": "ERROR",
133+
"partition_by_time_column": True,
133134
"disable_restatement": False,
134135
"dialect": "spark",
135136
},
@@ -859,7 +860,7 @@ def test_fingerprint(model: Model, parent_model: Model):
859860

860861
original_fingerprint = SnapshotFingerprint(
861862
data_hash="1312415267",
862-
metadata_hash="2967945306",
863+
metadata_hash="2906564841",
863864
)
864865

865866
assert fingerprint == original_fingerprint
@@ -959,7 +960,7 @@ def test_fingerprint_jinja_macros(model: Model):
959960
)
960961
original_fingerprint = SnapshotFingerprint(
961962
data_hash="923305614",
962-
metadata_hash="2967945306",
963+
metadata_hash="2906564841",
963964
)
964965

965966
fingerprint = fingerprint_from_node(model, nodes={})

tests/integrations/github/cicd/test_integration.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def test_merge_pr_has_non_breaking_change(
165165
166166
+++
167167
168-
@@ -15,7 +15,8 @@
168+
@@ -16,7 +16,8 @@
169169
170170
SELECT
171171
CAST(o.waiter_id AS INT) AS waiter_id,
@@ -362,7 +362,7 @@ def test_merge_pr_has_non_breaking_change_diff_start(
362362
363363
+++
364364
365-
@@ -15,7 +15,8 @@
365+
@@ -16,7 +16,8 @@
366366
367367
SELECT
368368
CAST(o.waiter_id AS INT) AS waiter_id,
@@ -868,7 +868,7 @@ def test_no_merge_since_no_deploy_signal(
868868
869869
+++
870870
871-
@@ -15,7 +15,8 @@
871+
@@ -16,7 +16,8 @@
872872
873873
SELECT
874874
CAST(o.waiter_id AS INT) AS waiter_id,
@@ -1049,7 +1049,7 @@ def test_no_merge_since_no_deploy_signal_no_approvers_defined(
10491049
10501050
+++
10511051
1052-
@@ -15,7 +15,8 @@
1052+
@@ -16,7 +16,8 @@
10531053
10541054
SELECT
10551055
CAST(o.waiter_id AS INT) AS waiter_id,
@@ -1219,7 +1219,7 @@ def test_deploy_comment_pre_categorized(
12191219
12201220
+++
12211221
1222-
@@ -15,7 +15,8 @@
1222+
@@ -16,7 +16,8 @@
12231223
12241224
SELECT
12251225
CAST(o.waiter_id AS INT) AS waiter_id,

tests/schedulers/airflow/test_client.py

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
113113
"time_column": {"column": "`ds`"},
114114
"forward_only": False,
115115
"on_destructive_change": "ERROR",
116+
"partition_by_time_column": True,
116117
"disable_restatement": False,
117118
"dialect": "spark",
118119
},

0 commit comments

Comments
 (0)