Skip to content

Commit

Permalink
[IMP] util.delete_unused
Browse files Browse the repository at this point in the history
Also consider child records when searching for record usage.

i.e. when try to remove a product category, also search for its sub
categories as they will be cascade deleted if unused.

closes #205

Signed-off-by: Nicolas Seinlet (nse) <nse@odoo.com>
Co-authored-by: Alvaro Fuentes <afu@odoo.com>
  • Loading branch information
KangOl and aj-fuentes committed Feb 20, 2025
1 parent e1c9190 commit 52a7732
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 19 deletions.
93 changes: 93 additions & 0 deletions src/base/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,99 @@ def test_replace_record_references_batch__uniqueness(self):
[count] = self.env.cr.fetchone()
self.assertEqual(count, 1)

def _prepare_test_delete_unused(self):
def create_cat():
name = f"test_{uuid.uuid4().hex}"
cat = self.env["res.partner.category"].create({"name": name})
self.env["ir.model.data"].create(
{"name": name, "module": "base", "model": "res.partner.category", "res_id": cat.id}
)
return cat

cat_1 = create_cat()
cat_2 = create_cat()
cat_3 = create_cat()

# `category_id` is a m2m, so in ON DELETE CASCADE. We need a m2o.
self.env.cr.execute(
"ALTER TABLE res_partner ADD COLUMN _cat_id integer REFERENCES res_partner_category(id) ON DELETE SET NULL"
)
p1 = self.env["res.partner"].create({"name": "test delete_unused"})

# set the `_cat_id` value in SQL as it is not know by the ORM
self.env.cr.execute("UPDATE res_partner SET _cat_id=%s WHERE id=%s", [cat_1.id, p1.id])

if hasattr(self, "_savepoint_id"):
self.addCleanup(self.env.cr.execute, f"SAVEPOINT test_{self._savepoint_id}")
self.addCleanup(cat_1.unlink)
self.addCleanup(cat_2.unlink)
self.addCleanup(cat_3.unlink)
self.addCleanup(p1.unlink)

self.addCleanup(self.env.cr.execute, "ALTER TABLE res_partner DROP COLUMN _cat_id")

return cat_1, cat_2, cat_3

def test_delete_unused_base(self):
tx = self.env["res.currency"].create({"name": "TX1", "symbol": "TX1"})
self.env["ir.model.data"].create({"name": "TX1", "module": "base", "model": "res.currency", "res_id": tx.id})

deleted = util.delete_unused(self.env.cr, "base.TX1")
self.assertEqual(deleted, ["base.TX1"])
self.assertFalse(tx.exists())

def test_delete_unused_cascade(self):
cat_1, cat_2, cat_3 = self._prepare_test_delete_unused()
deleted = util.delete_unused(self.env.cr, f"base.{cat_1.name}", f"base.{cat_2.name}", f"base.{cat_3.name}")

self.assertEqual(set(deleted), {f"base.{cat_2.name}", f"base.{cat_3.name}"})
self.assertTrue(cat_1.exists())
self.assertFalse(cat_2.exists())
self.assertFalse(cat_3.exists())

def test_delete_unused_tree(self):
cat_1, cat_2, cat_3 = self._prepare_test_delete_unused()

cat_1.parent_id = cat_2.id
cat_2.parent_id = cat_3.id
util.flush(cat_1)
util.flush(cat_2)

deleted = util.delete_unused(self.env.cr, f"base.{cat_3.name}")

self.assertEqual(deleted, [])
self.assertTrue(cat_1.exists())
self.assertTrue(cat_2.exists())
self.assertTrue(cat_3.exists())

def test_delete_unused_multi_cascade_fk(self):
"""
When there are multiple children, the hierarchy can be build from different columns
cat_3
| via `_test_id`
cat_2
| via `parent_id`
cat_1
"""
cat_1, cat_2, cat_3 = self._prepare_test_delete_unused()

self.env.cr.execute(
"ALTER TABLE res_partner_category ADD COLUMN _test_id integer REFERENCES res_partner_category(id) ON DELETE CASCADE"
)
self.addCleanup(self.env.cr.execute, "ALTER TABLE res_partner_category DROP COLUMN _test_id")

cat_1.parent_id = cat_2.id
util.flush(cat_1)
self.env.cr.execute("UPDATE res_partner_category SET _test_id = %s WHERE id = %s", [cat_3.id, cat_2.id])

deleted = util.delete_unused(self.env.cr, f"base.{cat_3.name}")

self.assertEqual(deleted, [])
self.assertTrue(cat_1.exists())
self.assertTrue(cat_2.exists())
self.assertTrue(cat_3.exists())


class TestEditView(UnitTestCase):
@parametrize(
Expand Down
85 changes: 66 additions & 19 deletions src/util/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,9 @@ def delete_unused(cr, *xmlids, **kwargs):
Remove unused records.
This function will remove records pointed by `xmlids` only if they are not referenced
from any table.
from any table. For hierarchical records (like product categories), it verifies
if the children marked as cascade removal are also not referenced. In which case
the record and its children are all removed.
.. note::
The records that cannot be removed are set as `noupdate=True`.
Expand Down Expand Up @@ -1250,9 +1252,45 @@ def delete_unused(cr, *xmlids, **kwargs):
table = table_of_model(cr, model)
res_id_to_xmlid = dict(zip(ids, xids))

sub = " UNION ".join(
cascade_children = [
fk_col
for fk_tbl, fk_col, _, fk_act in get_fk(cr, table, quote_ident=False)
if fk_tbl == table and fk_act == "c"
]
if cascade_children:
if len(cascade_children) == 1:
join = format_query(cr, "t.{}", cascade_children[0])
else:
join = sql.SQL("ANY(ARRAY[{}])").format(
sql.SQL(", ").join(sql.Composed([sql.SQL("t."), sql.Identifier(cc)]) for cc in cascade_children)
)

kids_query = format_query(
cr,
"""
WITH RECURSIVE _child AS (
SELECT id AS root, id AS child
FROM {0}
WHERE id = ANY(%(ids)s)
UNION -- don't use UNION ALL in case we have a loop
SELECT c.root, t.id as child
FROM {0} t
JOIN _child c
ON c.child = {1}
)
SELECT root AS id, array_agg(child) AS children
FROM _child
GROUP BY root
""",
table,
join,
)
else:
kids_query = format_query(cr, "SELECT id, ARRAY[id] AS children FROM {0} WHERE id = ANY(%(ids)s)", table)

sub = " UNION ALL ".join(
[
'SELECT 1 FROM "{}" x WHERE x."{}" = t.id'.format(fk_tbl, fk_col)
format_query(cr, "SELECT 1 FROM {} x WHERE x.{} = ANY(s.children)", fk_tbl, fk_col)
for fk_tbl, fk_col, _, fk_act in get_fk(cr, table, quote_ident=False)
# ignore "on delete cascade" fk (they are indirect dependencies (lines or m2m))
if fk_act != "c"
Expand All @@ -1261,18 +1299,27 @@ def delete_unused(cr, *xmlids, **kwargs):
]
)
if sub:
cr.execute(
"""
SELECT id
FROM "{}" t
WHERE id = ANY(%s)
AND NOT EXISTS({})
""".format(table, sub),
[list(ids)],
query = format_query(
cr,
r"""
WITH _kids AS (
{}
)
SELECT t.id
FROM {} t
JOIN _kids s
ON s.id = t.id
WHERE NOT EXISTS({})
""",
kids_query,
table,
sql.SQL(sub),
)
ids = map(itemgetter(0), cr.fetchall()) # noqa: PLW2901
cr.execute(query, {"ids": list(ids)})
sub_ids = list(map(itemgetter(0), cr.fetchall()))
else:
sub_ids = list(ids)

ids = list(ids) # noqa: PLW2901
if model == "res.lang" and table_exists(cr, "ir_translation"):
cr.execute(
"""
Expand All @@ -1281,16 +1328,16 @@ def delete_unused(cr, *xmlids, **kwargs):
WHERE t.lang = l.code
AND l.id = ANY(%s)
""",
[ids],
[sub_ids],
)
for tid in ids:
remove_record(cr, (model, tid))
deleted.append(res_id_to_xmlid[tid])

remove_records(cr, model, sub_ids)
deleted.extend(res_id_to_xmlid[r] for r in sub_ids if r in res_id_to_xmlid)

if deactivate:
deactivate_ids = tuple(set(res_id_to_xmlid.keys()) - set(ids))
deactivate_ids = tuple(set(sub_ids) - set(ids))
if deactivate_ids:
cr.execute('UPDATE "{}" SET active = false WHERE id IN %s'.format(table), [deactivate_ids])
cr.execute(format_query(cr, "UPDATE {} SET active = false WHERE id IN %s", table), [deactivate_ids])

if not keep_xmlids:
query = """
Expand Down

0 comments on commit 52a7732

Please sign in to comment.