Skip to content

Commit

Permalink
RESET LATER
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelofern committed Jan 16, 2025
1 parent 9753dae commit ac96099
Show file tree
Hide file tree
Showing 2 changed files with 400 additions and 0 deletions.
190 changes: 190 additions & 0 deletions src/django_pg_migration_tools/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,31 @@ class ConstraintQueries:
""")


class ViewQueries:
CREATE_VIEW_AS_SELECT = "CREATE VIEW {view_name} AS SELECT * FROM {table_name};"
DROP_VIEW_IF_EXISTS = "DROP VIEW IF EXISTS {view_name};"
CHECK_VIEW_EXISTS = dedent("""
SELECT 1
FROM pg_catalog.pg_class
WHERE (
relname = {view_name}
AND relkind = 'v'
);
""")


class TableQueries:
CHECK_TABLE_EXISTS = dedent("""
SELECT 1
FROM pg_catalog.pg_class
WHERE (
relname = {table_name}
AND relkind = 'r'
);
""")
ALTER_TABLE_RENAME_TO = "ALTER TABLE {old_name} RENAME TO {new_name};"


class ColumnQueries:
ALTER_TABLE_ADD_NULL_COLUMN = dedent("""
ALTER TABLE {table_name}
Expand Down Expand Up @@ -1684,3 +1709,168 @@ def describe(self) -> str:
f"{base}. Note: Using django_pg_migration_tools "
f"SaferRemoveCheckConstraint operation."
)


class TableRenameMustBeInsideTransaction(Exception):
pass


class TableRenameManager(base_operations.Operation):
def __init__(
self,
app_label: str,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
from_state: migrations.state.ProjectState,
to_state: migrations.state.ProjectState,
old_name: str,
new_name: str,
reverse: bool,
):
self.app_label = app_label
self.schema_editor = schema_editor
self.from_state = from_state
self.to_state = to_state
self.old_name = old_name
self.new_name = new_name

if reverse:
self.new_model = from_state.apps.get_model(app_label, new_name)
self.old_model = to_state.apps.get_model(app_label, old_name)
else:
self.new_model = to_state.apps.get_model(app_label, new_name)
self.old_model = from_state.apps.get_model(app_label, old_name)

self.new_table_name = self.new_model._meta.db_table
self.old_table_name = self.old_model._meta.db_table

self._validate()

def rename_table_and_add_view(self) -> None:
if not self.allow_migrate_model(
self.schema_editor.connection.alias, self.new_model
):
return

if self._view_exists(name=self.old_table_name):
# If the view already exists, we might have already run the DDLs
# manually during quiet hours and are running the migration to
# achieve Django state consistency. We don't need to do anything
# else here.
return

self._alter_table_rename_to(
old_name=self.old_table_name, new_name=self.new_table_name
)
self._create_view_as_select(
source=self.new_table_name, alias=self.old_table_name
)

def drop_view_and_rename_table(self) -> None:
if not self.allow_migrate_model(
self.schema_editor.connection.alias, self.new_model
):
return

if self._table_exists(name=self.old_table_name):
return

self._drop_view_if_exists(name=self.old_table_name)
self._alter_table_rename_to(
old_name=self.new_table_name, new_name=self.old_table_name
)

def _view_exists(self, name: str) -> bool:
return _run_introspection_query(
self.schema_editor,
psycopg_sql.SQL(ViewQueries.CHECK_VIEW_EXISTS)
.format(view_name=psycopg_sql.Literal(name))
.as_string(self.schema_editor.connection.connection),
)

def _table_exists(self, name: str) -> bool:
return _run_introspection_query(
self.schema_editor,
psycopg_sql.SQL(TableQueries.CHECK_TABLE_EXISTS)
.format(table_name=psycopg_sql.Literal(name))
.as_string(self.schema_editor.connection.connection),
)

def _alter_table_rename_to(self, old_name: str, new_name: str) -> None:
self.schema_editor.execute(
psycopg_sql.SQL(TableQueries.ALTER_TABLE_RENAME_TO)
.format(
old_name=psycopg_sql.Identifier(old_name),
new_name=psycopg_sql.Identifier(new_name),
)
.as_string(self.schema_editor.connection.connection)
)

def _create_view_as_select(self, source: str, alias: str) -> None:
self.schema_editor.execute(
psycopg_sql.SQL(ViewQueries.CREATE_VIEW_AS_SELECT)
.format(
table_name=psycopg_sql.Identifier(source),
view_name=psycopg_sql.Identifier(alias),
)
.as_string(self.schema_editor.connection.connection)
)

def _drop_view_if_exists(self, name: str) -> None:
self.schema_editor.execute(
psycopg_sql.SQL(ViewQueries.DROP_VIEW_IF_EXISTS)
.format(view_name=psycopg_sql.Identifier(name))
.as_string(self.schema_editor.connection.connection)
)

def _validate(self) -> None:
if not self.schema_editor.connection.in_atomic_block:
raise TableRenameMustBeInsideTransaction(
"Can't rename a table outside a transaction. Please set "
"atomic = True on the migration."
)


class SaferRenameModelPart1(operation_models.RenameModel):
old_name: str
new_name: str

def database_forwards(
self,
app_label: str,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
from_state: migrations.state.ProjectState,
to_state: migrations.state.ProjectState,
) -> None:
TableRenameManager(
app_label=app_label,
schema_editor=schema_editor,
from_state=from_state,
to_state=to_state,
old_name=self.old_name,
new_name=self.new_name,
reverse=False,
).rename_table_and_add_view()

def database_backwards(
self,
app_label: str,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
from_state: migrations.state.ProjectState,
to_state: migrations.state.ProjectState,
) -> None:
TableRenameManager(
app_label=app_label,
schema_editor=schema_editor,
from_state=from_state,
to_state=to_state,
old_name=self.old_name,
new_name=self.new_name,
reverse=True,
).drop_view_and_rename_table()

def describe(self) -> str:
base = super().describe()
return (
f"{base}. Note: Using django_pg_migration_tools "
f"SaferRenameModelPart1 operation."
)
Loading

0 comments on commit ac96099

Please sign in to comment.