31
31
from sqlmesh .core .dialect import parse , schema_
32
32
from sqlmesh .core .engine_adapter .duckdb import DuckDBEngineAdapter
33
33
from sqlmesh .core .environment import Environment , EnvironmentNamingInfo , EnvironmentStatements
34
- from sqlmesh .core .macros import MacroEvaluator
34
+ from sqlmesh .core .macros import MacroEvaluator , RuntimeStage
35
35
from sqlmesh .core .model import load_sql_based_model , model , SqlModel , Model
36
36
from sqlmesh .core .model .cache import OptimizedQueryCache
37
37
from sqlmesh .core .renderer import render_statements
@@ -1444,7 +1444,7 @@ def test_environment_statements(tmp_path: pathlib.Path):
1444
1444
1445
1445
@macro()
1446
1446
def grant_select_privileges(evaluator):
1447
- if evaluator._environment_naming_info:
1447
+ if evaluator._environment_naming_info and evaluator.runtime_stage == 'before_all' :
1448
1448
mapping = to_view_mapping(
1449
1449
evaluator._snapshots.values(), evaluator._environment_naming_info
1450
1450
)
@@ -1493,7 +1493,10 @@ def grant_schema_usage(evaluator):
1493
1493
assert isinstance (python_env ["grant_select_privileges" ], Executable )
1494
1494
1495
1495
before_all_rendered = render_statements (
1496
- statements = before_all , dialect = dialect , python_env = python_env
1496
+ statements = before_all ,
1497
+ dialect = dialect ,
1498
+ python_env = python_env ,
1499
+ runtime_stage = RuntimeStage .BEFORE_ALL ,
1497
1500
)
1498
1501
1499
1502
assert before_all_rendered == [
@@ -1506,6 +1509,7 @@ def grant_schema_usage(evaluator):
1506
1509
python_env = python_env ,
1507
1510
snapshots = snapshots ,
1508
1511
environment_naming_info = EnvironmentNamingInfo (name = "prod" ),
1512
+ runtime_stage = RuntimeStage .BEFORE_ALL ,
1509
1513
)
1510
1514
1511
1515
assert after_all_rendered == [
@@ -1519,6 +1523,7 @@ def grant_schema_usage(evaluator):
1519
1523
python_env = python_env ,
1520
1524
snapshots = snapshots ,
1521
1525
environment_naming_info = EnvironmentNamingInfo (name = "dev" ),
1526
+ runtime_stage = RuntimeStage .BEFORE_ALL ,
1522
1527
)
1523
1528
1524
1529
assert after_all_rendered_dev == [
@@ -1534,7 +1539,7 @@ def test_plan_environment_statements(tmp_path: pathlib.Path):
1534
1539
1535
1540
config = Config (
1536
1541
model_defaults = ModelDefaultsConfig (dialect = dialect ),
1537
- before_all = ["@create_stats_table()" ],
1542
+ before_all = ["@create_stats_table()" , "@access_adapter()" ],
1538
1543
after_all = ["CREATE TABLE IF NOT EXISTS after_table AS SELECT @some_var" ],
1539
1544
variables = {"some_var" : 5 },
1540
1545
)
@@ -1578,9 +1583,34 @@ def create_stats_table(evaluator):
1578
1583
""" ,
1579
1584
)
1580
1585
1586
+ create_temp_file (
1587
+ tmp_path ,
1588
+ pathlib .Path (macros_dir , "access_adapter.py" ),
1589
+ """
1590
+ from sqlmesh.core.macros import macro
1591
+
1592
+ @macro()
1593
+ def access_adapter(evaluator):
1594
+ if evaluator.runtime_stage == 'before_all':
1595
+ engine_adapter = evaluator.engine_adapter
1596
+ for i in range(10):
1597
+ try:
1598
+ sql_inside_macro = f"CREATE TABLE IF NOT EXISTS db_connect AS SELECT {i} as 'access_attempt'"
1599
+ engine_adapter.execute(sql_inside_macro)
1600
+ return None
1601
+ except Exception as e:
1602
+ sleep(10)
1603
+ raise Exception(f"Failed to connect to the database")
1604
+ """ ,
1605
+ )
1606
+
1581
1607
context = Context (paths = tmp_path , config = config )
1582
1608
1583
- assert context ._environment_statements [0 ].before_all == ["@create_stats_table()" ]
1609
+ assert context ._environment_statements [0 ].before_all == [
1610
+ "@create_stats_table()" ,
1611
+ "@access_adapter()" ,
1612
+ ]
1613
+
1584
1614
assert context ._environment_statements [0 ].after_all == [
1585
1615
"CREATE TABLE IF NOT EXISTS after_table AS SELECT @some_var"
1586
1616
]
@@ -1619,6 +1649,11 @@ def create_stats_table(evaluator):
1619
1649
assert state_table [0 ].after_all == context ._environment_statements [0 ].after_all
1620
1650
assert state_table [0 ].python_env == context ._environment_statements [0 ].python_env
1621
1651
1652
+ # This table will be created inside the macro by accessing the engine_adapter directly
1653
+ inside_macro_execute = context .fetchdf ("select * from memory.db_connect" ).to_dict ()
1654
+ assert (attempt_column := inside_macro_execute .get ("access_attempt" ))
1655
+ assert isinstance (attempt_column , dict ) and attempt_column [0 ] < 10
1656
+
1622
1657
1623
1658
def test_environment_statements_dialect (tmp_path : Path ):
1624
1659
before_all = [
0 commit comments