Skip to content

Commit 6d6b644

Browse files
authored
Fix: Migration of indirect non-breaking views (#4020)
1 parent 61ff15a commit 6d6b644

File tree

4 files changed

+106
-22
lines changed

4 files changed

+106
-22
lines changed

sqlmesh/core/plan/evaluator.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ def evaluate(
143143
deployability_index_for_evaluation,
144144
circuit_breaker=circuit_breaker,
145145
)
146-
promotion_result = self._promote(plan, snapshots, before_promote_snapshots)
146+
promotion_result = self._promote(
147+
plan, snapshots, before_promote_snapshots, deployability_index_for_creation
148+
)
147149
self._backfill(
148150
plan,
149151
snapshots_by_name,
@@ -301,6 +303,7 @@ def _promote(
301303
plan: EvaluatablePlan,
302304
snapshots: t.Dict[SnapshotId, Snapshot],
303305
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
306+
deployability_index: t.Optional[DeployabilityIndex] = None,
304307
) -> PromotionResult:
305308
"""Promote a plan.
306309
@@ -320,7 +323,8 @@ def _promote(
320323
self.snapshot_evaluator.migrate(
321324
[s for s in snapshots.values() if s.is_paused],
322325
snapshots,
323-
plan.allow_destructive_models,
326+
allow_destructive_snapshots=plan.allow_destructive_models,
327+
deployability_index=deployability_index,
324328
)
325329
except NodeExecutionFailedError as ex:
326330
raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex))

sqlmesh/core/snapshot/evaluator.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
DeployabilityIndex,
5858
Intervals,
5959
Snapshot,
60-
SnapshotChangeCategory,
6160
SnapshotId,
6261
SnapshotInfoLike,
6362
SnapshotTableCleanupTask,
@@ -390,15 +389,17 @@ def migrate(
390389
target_snapshots: t.Iterable[Snapshot],
391390
snapshots: t.Dict[SnapshotId, Snapshot],
392391
allow_destructive_snapshots: t.Set[str] = set(),
392+
deployability_index: t.Optional[DeployabilityIndex] = None,
393393
) -> None:
394394
"""Alters a physical snapshot table to match its snapshot's schema for the given collection of snapshots.
395395
396396
Args:
397397
target_snapshots: Target snapshots.
398398
snapshots: Mapping of snapshot ID to snapshot.
399399
allow_destructive_snapshots: Set of snapshots that are allowed to have destructive schema changes.
400+
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
400401
"""
401-
402+
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
402403
with self.concurrent_context():
403404
concurrent_apply_to_snapshots(
404405
target_snapshots,
@@ -407,6 +408,7 @@ def migrate(
407408
snapshots,
408409
allow_destructive_snapshots,
409410
self._get_adapter(s.model_gateway),
411+
deployability_index,
410412
),
411413
self.ddl_concurrent_tasks,
412414
)
@@ -850,15 +852,13 @@ def _migrate_snapshot(
850852
snapshots: t.Dict[SnapshotId, Snapshot],
851853
allow_destructive_snapshots: t.Set[str],
852854
adapter: EngineAdapter,
855+
deployability_index: DeployabilityIndex,
853856
) -> None:
854-
if not snapshot.is_paused or not snapshot.is_model:
855-
return
856-
857-
needs_migration = snapshot.model.forward_only or snapshot.change_category in (
858-
SnapshotChangeCategory.FORWARD_ONLY,
859-
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
860-
)
861-
if not needs_migration:
857+
if (
858+
not snapshot.is_paused
859+
or not snapshot.is_model
860+
or deployability_index.is_representative(snapshot)
861+
):
862862
return
863863

864864
target_table_name = snapshot.table_name()

tests/core/test_integration.py

+65
Original file line numberDiff line numberDiff line change
@@ -2140,6 +2140,71 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
21402140
)
21412141

21422142

2143+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2144+
def test_indirect_non_breaking_view_model_non_representative_snapshot_migration(
2145+
init_and_plan_context: t.Callable,
2146+
):
2147+
context, _ = init_and_plan_context("examples/sushi")
2148+
2149+
forward_only_model_expr = d.parse(
2150+
"""
2151+
MODEL (
2152+
name memory.sushi.forward_only_model,
2153+
kind INCREMENTAL_BY_TIME_RANGE (
2154+
time_column ds,
2155+
forward_only TRUE,
2156+
on_destructive_change 'allow',
2157+
),
2158+
);
2159+
2160+
SELECT '2023-01-07' AS ds, 1 AS a;
2161+
"""
2162+
)
2163+
forward_only_model = load_sql_based_model(forward_only_model_expr)
2164+
context.upsert_model(forward_only_model)
2165+
2166+
downstream_view_a_expr = d.parse(
2167+
"""
2168+
MODEL (
2169+
name memory.sushi.downstream_view_a,
2170+
kind VIEW,
2171+
);
2172+
2173+
SELECT a from memory.sushi.forward_only_model;
2174+
"""
2175+
)
2176+
downstream_view_a = load_sql_based_model(downstream_view_a_expr)
2177+
context.upsert_model(downstream_view_a)
2178+
2179+
downstream_view_b_expr = d.parse(
2180+
"""
2181+
MODEL (
2182+
name memory.sushi.downstream_view_b,
2183+
kind VIEW,
2184+
);
2185+
2186+
SELECT a from memory.sushi.downstream_view_a;
2187+
"""
2188+
)
2189+
downstream_view_b = load_sql_based_model(downstream_view_b_expr)
2190+
context.upsert_model(downstream_view_b)
2191+
2192+
context.plan(auto_apply=True, no_prompts=True, skip_tests=True)
2193+
2194+
# Make a forward-only change
2195+
context.upsert_model(add_projection_to_model(t.cast(SqlModel, forward_only_model)))
2196+
# Make a non-breaking change downstream
2197+
context.upsert_model(add_projection_to_model(t.cast(SqlModel, downstream_view_a)))
2198+
2199+
context.plan(auto_apply=True, no_prompts=True, skip_tests=True)
2200+
2201+
# Make sure the downstrean indirect non-breaking view is available in prod
2202+
count = context.engine_adapter.fetchone("SELECT COUNT(*) FROM memory.sushi.downstream_view_b")[
2203+
0
2204+
]
2205+
assert count > 0
2206+
2207+
21432208
@time_machine.travel("2023-01-08 15:00:00 UTC")
21442209
def test_unaligned_start_snapshot_with_non_deployable_downstream(init_and_plan_context: t.Callable):
21452210
context, _ = init_and_plan_context("examples/sushi")

tests/core/test_snapshot_evaluator.py

+25-10
Original file line numberDiff line numberDiff line change
@@ -1191,7 +1191,7 @@ def columns(table_name):
11911191
snapshot = make_snapshot(model, version="1")
11921192
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
11931193

1194-
evaluator.migrate([snapshot], {})
1194+
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
11951195

11961196
cursor_mock.execute.assert_has_calls(
11971197
[
@@ -1226,7 +1226,7 @@ def test_migrate_missing_table(mocker: MockerFixture, make_snapshot):
12261226
snapshot = make_snapshot(model, version="1")
12271227
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
12281228

1229-
evaluator.migrate([snapshot], {})
1229+
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
12301230

12311231
cursor_mock.execute.assert_has_calls(
12321232
[
@@ -1262,7 +1262,7 @@ def test_migrate_view(
12621262
snapshot = make_snapshot(model, version="1")
12631263
snapshot.change_category = change_category
12641264

1265-
evaluator.migrate([snapshot], {})
1265+
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
12661266

12671267
cursor_mock.execute.assert_has_calls(
12681268
[
@@ -1722,7 +1722,7 @@ def columns(table_name):
17221722
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
17231723

17241724
with pytest.raises(NodeExecutionFailedError) as ex:
1725-
evaluator.migrate([snapshot], {})
1725+
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
17261726

17271727
destructive_change_err = ex.value.__cause__
17281728
assert isinstance(destructive_change_err, DestructiveChangeError)
@@ -1744,15 +1744,20 @@ def columns(table_name):
17441744

17451745
logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")
17461746
with patch.object(logger, "warning") as mock_logger:
1747-
evaluator.migrate([snapshot], {})
1747+
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
17481748
assert (
17491749
mock_logger.call_args[0][0]
17501750
== "\nPlan requires a destructive change to forward-only model '\"test_schema\".\"test_model\"'s schema that drops column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 DROP COLUMN b\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN a INT"
17511751
)
17521752

17531753
# allow destructive
17541754
with patch.object(logger, "warning") as mock_logger:
1755-
evaluator.migrate([snapshot], {}, {'"test_schema"."test_model"'})
1755+
evaluator.migrate(
1756+
[snapshot],
1757+
{},
1758+
{'"test_schema"."test_model"'},
1759+
deployability_index=DeployabilityIndex.none_deployable(),
1760+
)
17561761
assert mock_logger.call_count == 0
17571762

17581763

@@ -3638,7 +3643,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc
36383643
assert new_snapshot.table_name() == snapshot.table_name()
36393644

36403645
evaluator.create([new_snapshot], {})
3641-
evaluator.migrate([new_snapshot], {})
3646+
evaluator.migrate([new_snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
36423647

36433648
common_kwargs: t.Dict[str, t.Any] = dict(
36443649
table_format=None,
@@ -3706,7 +3711,11 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
37063711

37073712
# no schema changes - no-op
37083713
adapter_mock.get_alter_expressions.return_value = []
3709-
evaluator.migrate(target_snapshots=[snapshot], snapshots={})
3714+
evaluator.migrate(
3715+
target_snapshots=[snapshot],
3716+
snapshots={},
3717+
deployability_index=DeployabilityIndex.none_deployable(),
3718+
)
37103719

37113720
adapter_mock.create_table.assert_not_called()
37123721
adapter_mock.create_managed_table.assert_not_called()
@@ -3716,7 +3725,11 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
37163725
adapter_mock.get_alter_expressions.return_value = [exp.Alter()]
37173726

37183727
with pytest.raises(NodeExecutionFailedError) as ex:
3719-
evaluator.migrate(target_snapshots=[snapshot], snapshots={})
3728+
evaluator.migrate(
3729+
target_snapshots=[snapshot],
3730+
snapshots={},
3731+
deployability_index=DeployabilityIndex.none_deployable(),
3732+
)
37203733

37213734
sqlmesh_err = ex.value.__cause__
37223735
assert isinstance(sqlmesh_err, SQLMeshError)
@@ -3907,7 +3920,9 @@ def columns(table_name):
39073920
)
39083921
snapshot_2 = make_snapshot(model_2, version="1")
39093922
snapshot_2.change_category = SnapshotChangeCategory.FORWARD_ONLY
3910-
evaluator.migrate([snapshot_1, snapshot_2], {})
3923+
evaluator.migrate(
3924+
[snapshot_1, snapshot_2], {}, deployability_index=DeployabilityIndex.none_deployable()
3925+
)
39113926

39123927
cursor_mock.execute.assert_has_calls(
39133928
[

0 commit comments

Comments
 (0)