Skip to content

Commit d745fc3

Browse files
authored
Fix: State sync snapshot count() quoting (#3954)
1 parent 0ffa844 commit d745fc3

File tree

3 files changed

+62
-1
lines changed

3 files changed

+62
-1
lines changed

sqlmesh/core/state_sync/db/snapshot.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sqlmesh.core.state_sync.db.utils import (
1515
snapshot_name_version_filter,
1616
snapshot_id_filter,
17+
fetchone,
1718
fetchall,
1819
create_batches,
1920
)
@@ -385,7 +386,7 @@ def update_auto_restatements(
385386

386387
def count(self) -> int:
387388
"""Counts the number of snapshots in the state."""
388-
result = self.engine_adapter.fetchone(exp.select("COUNT(*)").from_(self.snapshots_table))
389+
result = fetchone(self.engine_adapter, exp.select("COUNT(*)").from_(self.snapshots_table))
389390
return result[0] if result else 0
390391

391392
def clear_cache(self) -> None:

sqlmesh/migrations/v0055_add_updated_ts_unpaused_ts_ttl_ms_unrestorable_to_snapshot.py

+24
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,30 @@ def migrate(state_sync, **kwargs): # type: ignore
4646
]
4747
engine_adapter.execute(add_column_exps)
4848

49+
if engine_adapter.dialect == "databricks":
50+
# Databricks will throw an error like:
51+
# > databricks.sql.exc.ServerOperationError: [DELTA_UNSUPPORTED_DROP_COLUMN] DROP COLUMN is not supported for your Delta table.
52+
# when we try to drop `expiration_ts` below unless we set delta.columnMapping.mode to 'name'
53+
alter_table_exp = exp.Alter(
54+
this=exp.to_table(snapshots_table),
55+
kind="TABLE",
56+
actions=[
57+
exp.AlterSet(
58+
expressions=[
59+
exp.Properties(
60+
expressions=[
61+
exp.Property(
62+
this=exp.Literal.string("delta.columnMapping.mode"),
63+
value=exp.Literal.string("name"),
64+
)
65+
]
66+
)
67+
]
68+
)
69+
],
70+
)
71+
engine_adapter.execute(alter_table_exp)
72+
4973
drop_column_exp = exp.Alter(
5074
this=exp.to_table(snapshots_table),
5175
kind="TABLE",

tests/core/engine_adapter/integration/test_integration.py

+36
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
from sqlmesh import Config, Context
1919
from sqlmesh.cli.example_project import init_example_project
2020
from sqlmesh.core.config import load_config_from_paths
21+
from sqlmesh.core.config.connection import ConnectionConfig
2122
import sqlmesh.core.dialect as d
2223
from sqlmesh.core.dialect import select_from_values
2324
from sqlmesh.core.model import Model, load_sql_based_model
2425
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
2526
from sqlmesh.core.engine_adapter.mixins import RowDiffMixin
2627
from sqlmesh.core.model.definition import create_sql_model
2728
from sqlmesh.core.plan import Plan
29+
from sqlmesh.core.state_sync.db import EngineAdapterStateSync
2830
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
2931
from sqlmesh.utils.date import now, to_date, to_time_column
3032
from sqlmesh.core.table_diff import TableDiff
@@ -2667,3 +2669,37 @@ def test_table_diff_identical_dataset(ctx: TestContext):
26672669
assert row_diff.stats["t_only_count"] == 0
26682670
assert row_diff.s_sample.shape == (0, 3)
26692671
assert row_diff.t_sample.shape == (0, 3)
2672+
2673+
2674+
def test_state_migrate_from_scratch(ctx: TestContext):
2675+
if ctx.test_type != "query":
2676+
pytest.skip("state migration tests are only relevant for query")
2677+
2678+
test_schema = ctx.add_test_suffix("state")
2679+
ctx._schemas.append(test_schema) # so it gets cleaned up when the test finishes
2680+
2681+
def _use_warehouse_as_state_connection(gateway_name: str, config: Config):
2682+
warehouse_connection = config.gateways[gateway_name].connection
2683+
assert isinstance(warehouse_connection, ConnectionConfig)
2684+
if warehouse_connection.is_forbidden_for_state_sync:
2685+
pytest.skip(
2686+
f"{warehouse_connection.type_} doesnt support being used as a state connection"
2687+
)
2688+
2689+
# this triggers the fallback to using the warehouse as a state connection
2690+
config.gateways[gateway_name].state_connection = None
2691+
assert config.get_state_connection(gateway_name) is None
2692+
2693+
config.gateways[gateway_name].state_schema = test_schema
2694+
2695+
sqlmesh_context = ctx.create_context(config_mutator=_use_warehouse_as_state_connection)
2696+
assert sqlmesh_context.config.get_state_schema(ctx.gateway) == test_schema
2697+
2698+
state_sync = (
2699+
sqlmesh_context._new_state_sync()
2700+
) # this prevents migrate() being called which it does if you access the state_sync property
2701+
assert isinstance(state_sync, EngineAdapterStateSync)
2702+
assert state_sync.engine_adapter.dialect == ctx.dialect
2703+
2704+
# will throw if one of the migrations produces an error, which can happen if we forget to take quoting or normalization into account
2705+
sqlmesh_context.migrate()

0 commit comments

Comments
 (0)