From e6a3c461024296b6a6ff58bd878614a8972cc7c9 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 11 Feb 2025 18:30:38 -0700 Subject: [PATCH 01/43] Add background job to clear unreferenced state groups --- synapse/storage/controllers/purge_events.py | 61 ++++++++++++++++ synapse/storage/databases/main/state.py | 80 +++++++++++++++++++++ 2 files changed, 141 insertions(+) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 47cec8c469d..c91ee0dcda4 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -44,6 +44,12 @@ def __init__(self, hs: "HomeServer", stores: Databases): self._delete_state_groups_loop, 60 * 1000 ) + self._last_checked_state_group = 0 + if hs.config.worker.run_background_tasks: + self._clear_unreferenced_state_loop_call = hs.get_clock().looping_call( + self._clear_unreferenced_state_groups_loop, 60 * 1000 + ) + async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" @@ -203,3 +209,58 @@ async def _delete_state_groups( room_id, groups_to_sequences, ) + + @wrap_as_background_process("_clear_unreferenced_state_groups_loop") + async def _clear_unreferenced_state_groups_loop(self) -> None: + """Background task that deletes any state groups that may be pending + deletion.""" + + # Look for state groups that can be cleaned up. + search_limit = 100 + next_set = await self.stores.main.get_state_groups( + self._last_checked_state_group + 1, search_limit + ) + if len(next_set) < search_limit: + self._last_checked_state_group = 0 + else: + self._last_checked_state_group = list(next_set)[-1] + + if len(next_set) == 0: + return + + # TODO: add tests for this! + + referenced = await self.stores.main.get_referenced_state_groups(next_set) + next_set -= referenced + + if len(next_set) == 0: + return + + referenced = await self.stores.main.get_referenced_state_group_edges(next_set) + next_set -= referenced + + if len(next_set) == 0: + return + + # Find all state groups that can be deleted if the original set is deleted. + # This set includes the original set, as well as any state groups that would + # become unreferenced upon deleting the original set. + to_delete = await self._find_unreferenced_groups(next_set) + + if len(to_delete) == 0: + return + + pending_deletions = await self.stores.state_deletion.get_pending_deletions( + to_delete + ) + to_delete -= pending_deletions.keys() + + if len(to_delete) == 0: + return + + # TODO: state_groups_pending_deletion table is never cleaned up after deletion! + + # Mark the state groups for deletion by the deletion background task. + await self.stores.state_deletion.mark_state_groups_as_pending_deletion( + to_delete + ) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 788f7d1e325..2c3bc28e538 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -675,6 +675,86 @@ async def get_referenced_state_groups( return {row[0] for row in rows} + async def get_referenced_state_group_edges( + self, state_groups: Iterable[int] + ) -> Set[int]: + """Check if the state groups are referenced by other state groups. + + Args: + state_groups + + Returns: + The subset of state groups that are referenced. + """ + rows = cast( + List[Tuple[int]], + await self.db_pool.simple_select_many_batch( + table="state_group_edges", + column="state_group", + iterable=state_groups, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_group_edges", + ), + ) + + prev_rows = cast( + List[Tuple[int]], + await self.db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=state_groups, + keyvalues={}, + retcols=("DISTINCT prev_state_group",), + desc="get_referenced_state_group_edges_prev", + ), + ) + + state_groups = {row[0] for row in rows} + prev_groups = {row[0] for row in prev_rows} + state_groups |= prev_groups + + return state_groups + + async def get_state_groups( + self, + initial_state_group: int, + limit: int, + ) -> Set[int]: + """Get a list of stored state groups + + Args: + initial_state_group: get state groups starting with this one. + limit: the maximum number of state groups to return. + + Returns: + The set of stored state groups following the initial_state_group. + """ + return await self.db_pool.runInteraction( + "get_state_groups", + self._get_state_groups_txn, + initial_state_group, + limit, + ) + + def _get_state_groups_txn( + self, + txn: LoggingTransaction, + initial_state_group: int, + limit: int, + ) -> Set[int]: + sql = f""" + SELECT id from state_groups + WHERE id > {initial_state_group} + LIMIT {limit} + """ + + txn.execute(sql) + + rows = txn.fetchall() + + return {row[0] for row in rows} + async def update_state_for_partial_state_event( self, event: EventBase, From f9670ffee1742f742b6ff9828c2625acdb731ac2 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 11 Feb 2025 18:32:00 -0700 Subject: [PATCH 02/43] Add changelog entry --- changelog.d/18154.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/18154.feature diff --git a/changelog.d/18154.feature b/changelog.d/18154.feature new file mode 100644 index 00000000000..62e1b79a154 --- /dev/null +++ b/changelog.d/18154.feature @@ -0,0 +1 @@ +Add background job to clear unreferenced state groups. From 20efdd2ed55c6b78006510197bd8f654f5373d3b Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 12 Feb 2025 14:21:36 -0700 Subject: [PATCH 03/43] Add test for unreferenced state group cleanup --- synapse/storage/controllers/purge_events.py | 12 +-- synapse/storage/databases/main/state.py | 41 ---------- tests/storage/test_purge.py | 85 +++++++++++++++++++++ 3 files changed, 92 insertions(+), 46 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index c91ee0dcda4..7b7619d812e 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -36,6 +36,8 @@ class PurgeEventsStorageController: """High level interface for purging rooms and event history.""" + CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS = 60 * 1000 + def __init__(self, hs: "HomeServer", stores: Databases): self.stores = stores @@ -47,7 +49,7 @@ def __init__(self, hs: "HomeServer", stores: Databases): self._last_checked_state_group = 0 if hs.config.worker.run_background_tasks: self._clear_unreferenced_state_loop_call = hs.get_clock().looping_call( - self._clear_unreferenced_state_groups_loop, 60 * 1000 + self._clear_unreferenced_state_groups_loop, self.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS ) async def purge_room(self, room_id: str) -> None: @@ -228,16 +230,14 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: if len(next_set) == 0: return - # TODO: add tests for this! - referenced = await self.stores.main.get_referenced_state_groups(next_set) next_set -= referenced if len(next_set) == 0: return - referenced = await self.stores.main.get_referenced_state_group_edges(next_set) - next_set -= referenced + referenced = await self.stores.state.get_next_state_groups(next_set) + next_set -= set(referenced.values()) if len(next_set) == 0: return @@ -259,6 +259,8 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: return # TODO: state_groups_pending_deletion table is never cleaned up after deletion! + # TODO: should we be cleaning up any state_group_edges that are dangling after + # the deletion as well? # Mark the state groups for deletion by the deletion background task. await self.stores.state_deletion.mark_state_groups_as_pending_deletion( diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 2c3bc28e538..ba4d0af491f 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -675,47 +675,6 @@ async def get_referenced_state_groups( return {row[0] for row in rows} - async def get_referenced_state_group_edges( - self, state_groups: Iterable[int] - ) -> Set[int]: - """Check if the state groups are referenced by other state groups. - - Args: - state_groups - - Returns: - The subset of state groups that are referenced. - """ - rows = cast( - List[Tuple[int]], - await self.db_pool.simple_select_many_batch( - table="state_group_edges", - column="state_group", - iterable=state_groups, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_group_edges", - ), - ) - - prev_rows = cast( - List[Tuple[int]], - await self.db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=state_groups, - keyvalues={}, - retcols=("DISTINCT prev_state_group",), - desc="get_referenced_state_group_edges_prev", - ), - ) - - state_groups = {row[0] for row in rows} - prev_groups = {row[0] for row in prev_rows} - state_groups |= prev_groups - - return state_groups - async def get_state_groups( self, initial_state_group: int, diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 5d6a8518c0e..a063b7c23e2 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -270,3 +270,88 @@ def test_purge_unreferenced_state_group(self) -> None: ) ) self.assertEqual(len(state_groups), 1) + + def test_clear_unreferenced_state_groups(self) -> None: + """Test that any unreferenced state groups are automatically cleaned up. + """ + + self.helper.send(self.room_id, body="test1") + state1 = self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": 2} + ) + state2 = self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": 3} + ) + self.helper.send(self.room_id, body="test4") + last = self.helper.send(self.room_id, body="test5") + + # Create an unreferenced state group that has a prev group of one of the + # to-be-purged events. + prev_group = self.get_success( + self.store._get_state_group_for_event(state1["event_id"]) + ) + unreferenced_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=prev_group, + delta_ids={("org.matrix.test", ""): state2["event_id"]}, + current_state_ids=None, + ) + ) + + another_unreferenced_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=unreferenced_state_group, + delta_ids={("org.matrix.test", ""): state2["event_id"]}, + current_state_ids=None, + ) + ) + + # Advance so that the background job to clear unreferenced state groups runs + self.reactor.advance( + 1 + self._storage_controllers.purge_events.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS / 1000 + ) + + # Advance so that the background jobs to delete the state groups runs + self.reactor.advance( + 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000 + ) + + # We expect that the unreferenced state group has been deleted. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": unreferenced_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect that the other unreferenced state group has also been deleted. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": another_unreferenced_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect there to now only be one state group for the room, which is + # the state group of the last event (as the only outlier). + state_groups = self.get_success( + self.state_store.db_pool.simple_select_onecol( + table="state_groups", + keyvalues={"room_id": self.room_id}, + retcol="id", + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertEqual(len(state_groups), 8) From 9c50123b39aff296b68482ac35bc722f7c9770d2 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 12 Feb 2025 14:24:18 -0700 Subject: [PATCH 04/43] Remove comments --- synapse/storage/controllers/purge_events.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 7b7619d812e..21da16dc550 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -258,10 +258,6 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: if len(to_delete) == 0: return - # TODO: state_groups_pending_deletion table is never cleaned up after deletion! - # TODO: should we be cleaning up any state_group_edges that are dangling after - # the deletion as well? - # Mark the state groups for deletion by the deletion background task. await self.stores.state_deletion.mark_state_groups_as_pending_deletion( to_delete From 28679b6a34a235778f1343b774737f30cadbb260 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 12 Feb 2025 14:28:41 -0700 Subject: [PATCH 05/43] Fix linter errors --- synapse/storage/controllers/purge_events.py | 7 ++++--- tests/storage/test_purge.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 21da16dc550..2fb0428861c 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -49,7 +49,8 @@ def __init__(self, hs: "HomeServer", stores: Databases): self._last_checked_state_group = 0 if hs.config.worker.run_background_tasks: self._clear_unreferenced_state_loop_call = hs.get_clock().looping_call( - self._clear_unreferenced_state_groups_loop, self.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS + self._clear_unreferenced_state_groups_loop, + self.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS, ) async def purge_room(self, room_id: str) -> None: @@ -236,8 +237,8 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: if len(next_set) == 0: return - referenced = await self.stores.state.get_next_state_groups(next_set) - next_set -= set(referenced.values()) + next_state_groups = await self.stores.state.get_next_state_groups(next_set) + next_set -= set(next_state_groups.values()) if len(next_set) == 0: return diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index a063b7c23e2..bb63e5e4510 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -272,8 +272,7 @@ def test_purge_unreferenced_state_group(self) -> None: self.assertEqual(len(state_groups), 1) def test_clear_unreferenced_state_groups(self) -> None: - """Test that any unreferenced state groups are automatically cleaned up. - """ + """Test that any unreferenced state groups are automatically cleaned up.""" self.helper.send(self.room_id, body="test1") state1 = self.helper.send_state( @@ -312,7 +311,9 @@ def test_clear_unreferenced_state_groups(self) -> None: # Advance so that the background job to clear unreferenced state groups runs self.reactor.advance( - 1 + self._storage_controllers.purge_events.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS / 1000 + 1 + + self._storage_controllers.purge_events.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS + / 1000 ) # Advance so that the background jobs to delete the state groups runs From 977d83be7b9efd6c7bb52b83ad060f4e3b5a68a4 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 12 Feb 2025 15:23:11 -0700 Subject: [PATCH 06/43] Order state_groups --- synapse/storage/databases/main/state.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index ba4d0af491f..c5ec58217b9 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -703,8 +703,9 @@ def _get_state_groups_txn( limit: int, ) -> Set[int]: sql = f""" - SELECT id from state_groups + SELECT id FROM state_groups WHERE id > {initial_state_group} + ORDER BY id LIMIT {limit} """ From a487bcbd44a37deeb2ef5cc63a964c5b7ee10731 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 14:56:04 +0000 Subject: [PATCH 07/43] Update synapse/storage/controllers/purge_events.py Co-authored-by: Erik Johnston --- synapse/storage/controllers/purge_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 2fb0428861c..489cce01f4c 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -238,7 +238,7 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: return next_state_groups = await self.stores.state.get_next_state_groups(next_set) - next_set -= set(next_state_groups.values()) + next_set.difference_update(next_state_groups.values()) if len(next_set) == 0: return From 69d72c2eeecb49896cc1b7c73cd2347609a6aba9 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 14:56:11 +0000 Subject: [PATCH 08/43] Update synapse/storage/controllers/purge_events.py Co-authored-by: Erik Johnston --- synapse/storage/controllers/purge_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 489cce01f4c..98aa3decdeb 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -226,7 +226,7 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: if len(next_set) < search_limit: self._last_checked_state_group = 0 else: - self._last_checked_state_group = list(next_set)[-1] + self._last_checked_state_group = max(next_set) if len(next_set) == 0: return From d8bfac4d636cca7c37844b74ec4e31f6543446d9 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 14:56:34 +0000 Subject: [PATCH 09/43] Update synapse/storage/controllers/purge_events.py Co-authored-by: Erik Johnston --- synapse/storage/controllers/purge_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 98aa3decdeb..12dd61f8bb0 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -231,6 +231,7 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: if len(next_set) == 0: return + # Discard any state groups referenced directly by an event... referenced = await self.stores.main.get_referenced_state_groups(next_set) next_set -= referenced From 0955b7b42a9a2040b030c38b90051226696c2c69 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 14:56:57 +0000 Subject: [PATCH 10/43] Update synapse/storage/controllers/purge_events.py Co-authored-by: Erik Johnston --- synapse/storage/controllers/purge_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 12dd61f8bb0..a41530d60c1 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -238,6 +238,7 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: if len(next_set) == 0: return + # ... and discard any that are referenced by other state groups. next_state_groups = await self.stores.state.get_next_state_groups(next_set) next_set.difference_update(next_state_groups.values()) From ce87ba6b9cc82005975ff1f0a55f4eb1da1ba851 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 10:29:27 -0700 Subject: [PATCH 11/43] Change mark as pending deletion to do nothing on conflict --- synapse/storage/controllers/purge_events.py | 8 ----- synapse/storage/databases/state/deletion.py | 36 +++++++++++++++------ 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index a41530d60c1..6b9b6004f06 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -250,14 +250,6 @@ async def _clear_unreferenced_state_groups_loop(self) -> None: # become unreferenced upon deleting the original set. to_delete = await self._find_unreferenced_groups(next_set) - if len(to_delete) == 0: - return - - pending_deletions = await self.stores.state_deletion.get_pending_deletions( - to_delete - ) - to_delete -= pending_deletions.keys() - if len(to_delete) == 0: return diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py index d4b1c20a455..4fe4058d164 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py @@ -321,19 +321,37 @@ def _finish_persisting_txn( async def mark_state_groups_as_pending_deletion( self, state_groups: Collection[int] ) -> None: - """Mark the given state groups as pending deletion""" + """Mark the given state groups as pending deletion. - now = self._clock.time_msec() + If any of the state groups are already pending deletion, then those records are + left as is. + """ - await self.db_pool.simple_upsert_many( - table="state_groups_pending_deletion", - key_names=("state_group",), - key_values=[(state_group,) for state_group in state_groups], - value_names=("insertion_ts",), - value_values=[(now,) for _ in state_groups], - desc="mark_state_groups_as_pending_deletion", + await self.db_pool.runInteraction( + "mark_state_groups_as_pending_deletion", + self._mark_state_groups_as_pending_deletion_txn, + state_groups, ) + def _mark_state_groups_as_pending_deletion_txn( + self, + txn: LoggingTransaction, + state_groups: Collection[int], + ) -> None: + sql = """ + INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) + VALUES %s + ON CONFLICT (state_group) + DO NOTHING + """ + + now = self._clock.time_msec() + rows = [(state_group,now,) for state_group in state_groups] + if isinstance(txn.database_engine, PostgresEngine): + txn.execute_values(sql % ("?",), rows, fetch=False) + else: + txn.execute_batch(sql % ("(?, ?)",), rows) + async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None: """Mark the given state groups as now being referenced""" From 390079164b8157c08c184b36fbabc1de6da0c73b Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 10:36:23 -0700 Subject: [PATCH 12/43] Fix linter errors --- synapse/storage/databases/state/deletion.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py index 4fe4058d164..f77c46f6ae9 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py @@ -334,7 +334,7 @@ async def mark_state_groups_as_pending_deletion( ) def _mark_state_groups_as_pending_deletion_txn( - self, + self, txn: LoggingTransaction, state_groups: Collection[int], ) -> None: @@ -346,7 +346,13 @@ def _mark_state_groups_as_pending_deletion_txn( """ now = self._clock.time_msec() - rows = [(state_group,now,) for state_group in state_groups] + rows = [ + ( + state_group, + now, + ) + for state_group in state_groups + ] if isinstance(txn.database_engine, PostgresEngine): txn.execute_values(sql % ("?",), rows, fetch=False) else: From fe1df20ba5981bfd5d9362e1877e2c283075bf90 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 15:05:27 -0700 Subject: [PATCH 13/43] Move state group deletion job to background updates --- docs/development/database_schema.md | 2 +- synapse/storage/controllers/purge_events.py | 54 --------- synapse/storage/databases/main/state.py | 2 +- synapse/storage/databases/state/bg_updates.py | 104 ++++++++++++++++++ synapse/storage/schema/__init__.py | 5 +- .../01_delete_unreferenced_state_groups.sql | 16 +++ synapse/types/storage/__init__.py | 4 + tests/storage/test_purge.py | 27 +++-- 8 files changed, 150 insertions(+), 64 deletions(-) create mode 100644 synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql diff --git a/docs/development/database_schema.md b/docs/development/database_schema.md index 37a06acc128..620d1c16b00 100644 --- a/docs/development/database_schema.md +++ b/docs/development/database_schema.md @@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor * Whether the update requires a previous update to be complete. * A rough ordering for which to complete updates. -A new background updates needs to be added to the `background_updates` table: +A new background update needs to be added to the `background_updates` table: ```sql INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 6b9b6004f06..47cec8c469d 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -36,8 +36,6 @@ class PurgeEventsStorageController: """High level interface for purging rooms and event history.""" - CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS = 60 * 1000 - def __init__(self, hs: "HomeServer", stores: Databases): self.stores = stores @@ -46,13 +44,6 @@ def __init__(self, hs: "HomeServer", stores: Databases): self._delete_state_groups_loop, 60 * 1000 ) - self._last_checked_state_group = 0 - if hs.config.worker.run_background_tasks: - self._clear_unreferenced_state_loop_call = hs.get_clock().looping_call( - self._clear_unreferenced_state_groups_loop, - self.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS, - ) - async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" @@ -212,48 +203,3 @@ async def _delete_state_groups( room_id, groups_to_sequences, ) - - @wrap_as_background_process("_clear_unreferenced_state_groups_loop") - async def _clear_unreferenced_state_groups_loop(self) -> None: - """Background task that deletes any state groups that may be pending - deletion.""" - - # Look for state groups that can be cleaned up. - search_limit = 100 - next_set = await self.stores.main.get_state_groups( - self._last_checked_state_group + 1, search_limit - ) - if len(next_set) < search_limit: - self._last_checked_state_group = 0 - else: - self._last_checked_state_group = max(next_set) - - if len(next_set) == 0: - return - - # Discard any state groups referenced directly by an event... - referenced = await self.stores.main.get_referenced_state_groups(next_set) - next_set -= referenced - - if len(next_set) == 0: - return - - # ... and discard any that are referenced by other state groups. - next_state_groups = await self.stores.state.get_next_state_groups(next_set) - next_set.difference_update(next_state_groups.values()) - - if len(next_set) == 0: - return - - # Find all state groups that can be deleted if the original set is deleted. - # This set includes the original set, as well as any state groups that would - # become unreferenced upon deleting the original set. - to_delete = await self._find_unreferenced_groups(next_set) - - if len(to_delete) == 0: - return - - # Mark the state groups for deletion by the deletion background task. - await self.stores.state_deletion.mark_state_groups_as_pending_deletion( - to_delete - ) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index c5ec58217b9..9ce7fd292c5 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -704,7 +704,7 @@ def _get_state_groups_txn( ) -> Set[int]: sql = f""" SELECT id FROM state_groups - WHERE id > {initial_state_group} + WHERE id >= {initial_state_group} ORDER BY id LIMIT {limit} """ diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index f7824cba0f2..a308cff7b75 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -32,6 +32,7 @@ from synapse.storage.engines import PostgresEngine from synapse.types import MutableStateMap, StateMap from synapse.types.state import StateFilter +from synapse.types.storage import _BackgroundUpdates from synapse.util.caches import intern_string if TYPE_CHECKING: @@ -348,6 +349,10 @@ def __init__( table="local_current_membership", columns=["event_stream_ordering"], ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, + self._background_delete_unrefereneced_state_groups, + ) async def _background_deduplicate_state( self, progress: dict, batch_size: int @@ -524,3 +529,102 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None: ) return 1 + + async def _background_delete_unrefereneced_state_groups( + self, progress: dict, batch_size: int + ) -> int: + """This background update will slowly delete any unreferenced state groups""" + + last_checked_state_group = progress.get("last_checked_state_group") + + if last_checked_state_group is None: + # This is the first run. + last_checked_state_group = 0 + + ( + last_checked_state_group, + final_batch, + ) = await self._delete_unreferenced_state_groups_batch( + last_checked_state_group, batch_size + ) + + if not final_batch: + # There are more state groups to check. + progress = { + "last_checked_state_group": last_checked_state_group, + } + await self.db_pool.updates._background_update_progress( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, + progress, + ) + else: + # This background process is finished. + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE + ) + + return batch_size + + async def _delete_unreferenced_state_groups_batch( + self, last_checked_state_group: int, batch_size: int + ) -> tuple[int, bool]: + """Looks for unreferenced state groups starting from the last state group + checked, and any state groups which would become unreferenced if a state group + was deleted, and marks them for deletion. + + Args: + last_checked_state_group: The last state group that was checked. + batch_size: How many state groups to process in this iteration. + + Returns: + (last_checked_state_group, final_batch) + """ + + # Look for state groups that can be cleaned up. + next_set = await self.hs.get_datastores().main.get_state_groups( + last_checked_state_group + 1, batch_size + ) + + final_batch = False + if len(next_set) < batch_size: + final_batch = True + else: + last_checked_state_group = max(next_set) + + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # Discard any state groups referenced directly by an event... + referenced = await self.hs.get_datastores().main.get_referenced_state_groups( + next_set + ) + next_set -= referenced + + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # ... and discard any that are referenced by other state groups. + next_state_groups = await self.hs.get_datastores().state.get_next_state_groups( + next_set + ) + next_set.difference_update(next_state_groups.values()) + + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # Find all state groups that can be deleted if the original set is deleted. + # This set includes the original set, as well as any state groups that would + # become unreferenced upon deleting the original set. + to_delete = await self.hs.get_storage_controllers().purge_events._find_unreferenced_groups( + next_set + ) + + if len(to_delete) == 0: + return last_checked_state_group, final_batch + + # Mark the state groups for deletion by the deletion background task. + await self.hs.get_datastores().state_deletion.mark_state_groups_as_pending_deletion( + to_delete + ) + + return last_checked_state_group, final_batch diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 49e648a92fd..c577b8d95fe 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 89 # remember to update the list below when updating +SCHEMA_VERSION = 90 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -158,6 +158,9 @@ Changes in SCHEMA_VERSION = 89 - Add `state_groups_pending_deletion` and `state_groups_persisting` tables. + +Changes in SCHEMA_VERSION = 90 + - Add background update to delete unreferenced state groups. """ diff --git a/synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql b/synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql new file mode 100644 index 00000000000..46b09ffac3e --- /dev/null +++ b/synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql @@ -0,0 +1,16 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Add a background update to delete any unreferenced state groups +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (9001, 'delete_unreferenced_state_groups_bg_update', '{}'); diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py index b5fa20a41a5..d0a85ef2080 100644 --- a/synapse/types/storage/__init__.py +++ b/synapse/types/storage/__init__.py @@ -48,3 +48,7 @@ class _BackgroundUpdates: SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = ( "sliding_sync_membership_snapshots_fix_forgotten_column_bg_update" ) + + DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE = ( + "delete_unreferenced_state_groups_bg_update" + ) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index bb63e5e4510..f4263f93d1f 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -24,6 +24,7 @@ from synapse.rest.client import room from synapse.server import HomeServer from synapse.types.state import StateFilter +from synapse.types.storage import _BackgroundUpdates from synapse.util import Clock from tests.unittest import HomeserverTestCase @@ -278,6 +279,12 @@ def test_clear_unreferenced_state_groups(self) -> None: state1 = self.helper.send_state( self.room_id, "org.matrix.test", body={"number": 2} ) + # Create enough state events to require multiple batches of + # delete_unreferenced_state_groups_bg_update to be run. + for i in range(200): + self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": i} + ) state2 = self.helper.send_state( self.room_id, "org.matrix.test", body={"number": 3} ) @@ -309,14 +316,20 @@ def test_clear_unreferenced_state_groups(self) -> None: ) ) - # Advance so that the background job to clear unreferenced state groups runs - self.reactor.advance( - 1 - + self._storage_controllers.purge_events.CLEAR_UNREFERENCED_STATE_GROUPS_PERIOD_MS - / 1000 + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, + "progress_json": "{}", + }, + ) ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() - # Advance so that the background jobs to delete the state groups runs + # Advance so that the background job to delete the state groups runs self.reactor.advance( 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000 ) @@ -355,4 +368,4 @@ def test_clear_unreferenced_state_groups(self) -> None: desc="test_purge_unreferenced_state_group", ) ) - self.assertEqual(len(state_groups), 8) + self.assertEqual(len(state_groups), 208) From cc9e33b422261ada85e6635ce715afccbaeaa716 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 15:09:35 -0700 Subject: [PATCH 14/43] Fix linter error --- tests/storage/test_purge.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index f4263f93d1f..7ad0c2e932d 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -282,9 +282,7 @@ def test_clear_unreferenced_state_groups(self) -> None: # Create enough state events to require multiple batches of # delete_unreferenced_state_groups_bg_update to be run. for i in range(200): - self.helper.send_state( - self.room_id, "org.matrix.test", body={"number": i} - ) + self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i}) state2 = self.helper.send_state( self.room_id, "org.matrix.test", body={"number": 3} ) From 801ca86833808b0203e4cabab83e35306104740e Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 16:36:53 -0700 Subject: [PATCH 15/43] Pull over all the db calls since that's what it wants... --- synapse/storage/databases/main/state.py | 40 ---- synapse/storage/databases/state/bg_updates.py | 223 +++++++++++++++--- 2 files changed, 186 insertions(+), 77 deletions(-) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 9ce7fd292c5..788f7d1e325 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -675,46 +675,6 @@ async def get_referenced_state_groups( return {row[0] for row in rows} - async def get_state_groups( - self, - initial_state_group: int, - limit: int, - ) -> Set[int]: - """Get a list of stored state groups - - Args: - initial_state_group: get state groups starting with this one. - limit: the maximum number of state groups to return. - - Returns: - The set of stored state groups following the initial_state_group. - """ - return await self.db_pool.runInteraction( - "get_state_groups", - self._get_state_groups_txn, - initial_state_group, - limit, - ) - - def _get_state_groups_txn( - self, - txn: LoggingTransaction, - initial_state_group: int, - limit: int, - ) -> Set[int]: - sql = f""" - SELECT id FROM state_groups - WHERE id >= {initial_state_group} - ORDER BY id - LIMIT {limit} - """ - - txn.execute(sql) - - rows = txn.fetchall() - - return {row[0] for row in rows} - async def update_state_for_partial_state_event( self, event: EventBase, diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index a308cff7b75..4560ebc26cf 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -19,8 +19,20 @@ # # +import itertools import logging -from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + List, + Mapping, + Optional, + Set, + Tuple, + Union, + cast, +) from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore @@ -580,51 +592,188 @@ async def _delete_unreferenced_state_groups_batch( (last_checked_state_group, final_batch) """ - # Look for state groups that can be cleaned up. - next_set = await self.hs.get_datastores().main.get_state_groups( - last_checked_state_group + 1, batch_size - ) + def cleanup_txn( + txn: LoggingTransaction, last_checked_state_group: int, batch_size: int + ) -> Tuple[int, bool]: + # Look for state groups that can be cleaned up. + txn.execute(f""" + SELECT id FROM state_groups + WHERE id > {last_checked_state_group} + ORDER BY id + LIMIT {batch_size} + """) + + rows = txn.fetchall() + next_set = {row[0] for row in rows} + + final_batch = False + if len(next_set) < batch_size: + final_batch = True + else: + last_checked_state_group = max(next_set) - final_batch = False - if len(next_set) < batch_size: - final_batch = True - else: - last_checked_state_group = max(next_set) + if len(next_set) == 0: + return last_checked_state_group, final_batch - if len(next_set) == 0: - return last_checked_state_group, final_batch + # Discard any state groups referenced directly by an event... + rows = cast( + List[Tuple[int]], + self.db_pool.simple_select_many_txn( + txn, + table="event_to_state_groups", + column="state_group", + iterable=next_set, + keyvalues={}, + retcols=("DISTINCT state_group",), + ), + ) - # Discard any state groups referenced directly by an event... - referenced = await self.hs.get_datastores().main.get_referenced_state_groups( - next_set - ) - next_set -= referenced + referenced = {row[0] for row in rows} + next_set -= referenced - if len(next_set) == 0: - return last_checked_state_group, final_batch + if len(next_set) == 0: + return last_checked_state_group, final_batch - # ... and discard any that are referenced by other state groups. - next_state_groups = await self.hs.get_datastores().state.get_next_state_groups( - next_set - ) - next_set.difference_update(next_state_groups.values()) + # ... and discard any that are referenced by other state groups. + rows = cast( + List[Tuple[int, int]], + self.db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=next_set, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + ), + ) - if len(next_set) == 0: - return last_checked_state_group, final_batch + next_state_groups = dict(rows) + next_set.difference_update(next_state_groups.values()) - # Find all state groups that can be deleted if the original set is deleted. - # This set includes the original set, as well as any state groups that would - # become unreferenced upon deleting the original set. - to_delete = await self.hs.get_storage_controllers().purge_events._find_unreferenced_groups( - next_set - ) + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # Find all state groups that can be deleted if the original set is deleted. + # This set includes the original set, as well as any state groups that would + # become unreferenced upon deleting the original set. + to_delete = self._find_unreferenced_groups(txn, next_set) + + if len(to_delete) == 0: + return last_checked_state_group, final_batch + + # Mark the state groups for deletion by the deletion background task. + sql = """ + INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) + VALUES %s + ON CONFLICT (state_group) + DO NOTHING + """ + + now = self._clock.time_msec() + rows = [ + ( + state_group, + now, + ) + for state_group in to_delete + ] + if isinstance(txn.database_engine, PostgresEngine): + txn.execute_values(sql % ("?",), rows, fetch=False) + else: + txn.execute_batch(sql % ("(?, ?)",), rows) - if len(to_delete) == 0: return last_checked_state_group, final_batch - # Mark the state groups for deletion by the deletion background task. - await self.hs.get_datastores().state_deletion.mark_state_groups_as_pending_deletion( - to_delete + return await self.db_pool.runInteraction( + "mark_unreferenced_state_groups_for_deletion", + cleanup_txn, + last_checked_state_group, + batch_size, ) - return last_checked_state_group, final_batch + def _find_unreferenced_groups( + self, txn: LoggingTransaction, state_groups: Collection[int] + ) -> Set[int]: + """Used when purging history to figure out which state groups can be + deleted. + + Args: + state_groups: Set of state groups referenced by events + that are going to be deleted. + + Returns: + The set of state groups that can be deleted. + """ + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + rows = self.db_pool.simple_select_many_txn( + txn, + table="event_to_state_groups", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("DISTINCT state_group",), + ) + + referenced = {row[0] for row in rows} + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + rows = self.db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + ) + + edges = dict(rows) + + prevs = set(edges.values()) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + # We also check to see if anything referencing the state groups are + # also unreferenced. This helps ensure that we delete unreferenced + # state groups, if we don't then we will de-delta them when we + # delete the other state groups leading to increased DB usage. + rows = self.db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + ) + + next_edges = dict(rows) + nexts = set(next_edges.keys()) + nexts -= state_groups_seen + next_to_search |= nexts + state_groups_seen |= nexts + + to_delete = state_groups_seen - referenced_groups + + return to_delete From ccb21582ed86404dfa3b20dac6f4aadba78af763 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 17:09:59 -0700 Subject: [PATCH 16/43] Try OVERRIDING SYSTEM VALUE --- synapse/_scripts/synapse_port_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 3f67a739a06..1e7cbcf8403 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -271,7 +271,7 @@ def r(txn: LoggingTransaction) -> List[Tuple]: def insert_many_txn( self, txn: LoggingTransaction, table: str, headers: List[str], rows: List[Tuple] ) -> None: - sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + sql = "INSERT INTO %s (%s) VALUES (%s) OVERRIDING SYSTEM VALUE" % ( table, ", ".join(k for k in headers), ", ".join("%s" for _ in headers), From ca7ed7604ee67ac2278aae25508688f37cc1cdc0 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 13 Feb 2025 17:17:00 -0700 Subject: [PATCH 17/43] Move OVERRIDING SYSTEM VALUE --- synapse/_scripts/synapse_port_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 1e7cbcf8403..ff5c150fd6c 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -271,7 +271,7 @@ def r(txn: LoggingTransaction) -> List[Tuple]: def insert_many_txn( self, txn: LoggingTransaction, table: str, headers: List[str], rows: List[Tuple] ) -> None: - sql = "INSERT INTO %s (%s) VALUES (%s) OVERRIDING SYSTEM VALUE" % ( + sql = "INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE VALUES (%s)" % ( table, ", ".join(k for k in headers), ", ".join("%s" for _ in headers), From f45dcb1a0c05ceabe1a54a3c25cbb96fb11ee947 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 16:46:02 +0000 Subject: [PATCH 18/43] Update synapse/storage/databases/state/bg_updates.py Co-authored-by: Erik Johnston --- synapse/storage/databases/state/bg_updates.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 4560ebc26cf..0b054f199c7 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -603,8 +603,7 @@ def cleanup_txn( LIMIT {batch_size} """) - rows = txn.fetchall() - next_set = {row[0] for row in rows} + next_set = {row[0] for row in txn} final_batch = False if len(next_set) < batch_size: From 5e05af276620c3c862447d23b88df5cd5cdd777f Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 16:46:20 +0000 Subject: [PATCH 19/43] Update synapse/storage/databases/state/bg_updates.py Co-authored-by: Erik Johnston --- synapse/storage/databases/state/bg_updates.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 0b054f199c7..3d1b34c9358 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -662,10 +662,10 @@ def cleanup_txn( # Mark the state groups for deletion by the deletion background task. sql = """ - INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) - VALUES %s - ON CONFLICT (state_group) - DO NOTHING + INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) + VALUES %s + ON CONFLICT (state_group) + DO NOTHING """ now = self._clock.time_msec() From 7d1ce8d855e35a30d151177c8a7ed3b44b422f5e Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 10:35:09 -0700 Subject: [PATCH 20/43] Review comments & cleanup --- synapse/storage/databases/state/bg_updates.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 3d1b34c9358..6432587674e 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -627,27 +627,25 @@ def cleanup_txn( ), ) - referenced = {row[0] for row in rows} - next_set -= referenced + next_set.difference_update({row[0] for row in rows}) if len(next_set) == 0: return last_checked_state_group, final_batch # ... and discard any that are referenced by other state groups. rows = cast( - List[Tuple[int, int]], + List[Tuple[int]], self.db_pool.simple_select_many_txn( txn, table="state_group_edges", column="prev_state_group", iterable=next_set, keyvalues={}, - retcols=("state_group", "prev_state_group"), + retcols=("DISTINCT prev_state_group",), ), ) - next_state_groups = dict(rows) - next_set.difference_update(next_state_groups.values()) + next_set.difference_update(row[0] for row in rows) if len(next_set) == 0: return last_checked_state_group, final_batch @@ -669,7 +667,7 @@ def cleanup_txn( """ now = self._clock.time_msec() - rows = [ + deletion_rows = [ ( state_group, now, @@ -677,9 +675,9 @@ def cleanup_txn( for state_group in to_delete ] if isinstance(txn.database_engine, PostgresEngine): - txn.execute_values(sql % ("?",), rows, fetch=False) + txn.execute_values(sql % ("?",), deletion_rows, fetch=False) else: - txn.execute_batch(sql % ("(?, ?)",), rows) + txn.execute_batch(sql % ("(?, ?)",), deletion_rows) return last_checked_state_group, final_batch From 3c50f71ac140da8ccd3fabed9d07a7be39cc4687 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 10:47:02 -0700 Subject: [PATCH 21/43] No string interpolation for sql --- synapse/storage/databases/state/bg_updates.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 6432587674e..d71138b77fd 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -596,12 +596,10 @@ def cleanup_txn( txn: LoggingTransaction, last_checked_state_group: int, batch_size: int ) -> Tuple[int, bool]: # Look for state groups that can be cleaned up. - txn.execute(f""" - SELECT id FROM state_groups - WHERE id > {last_checked_state_group} - ORDER BY id - LIMIT {batch_size} - """) + state_group_sql = ( + "SELECT id FROM state_groups WHERE id > ? ORDER BY id LIMIT ?" + ) + txn.execute(state_group_sql, (last_checked_state_group, batch_size)) next_set = {row[0] for row in txn} From 7f611e0b588e3ec6f819b6acf827d35979d3b229 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 10:51:07 -0700 Subject: [PATCH 22/43] Move background task to current schema version --- synapse/storage/schema/__init__.py | 4 +--- .../02_delete_unreferenced_state_groups.sql} | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) rename synapse/storage/schema/state/delta/{90/01_delete_unreferenced_state_groups.sql => 89/02_delete_unreferenced_state_groups.sql} (91%) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index c577b8d95fe..c90c2c60515 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 90 # remember to update the list below when updating +SCHEMA_VERSION = 89 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -158,8 +158,6 @@ Changes in SCHEMA_VERSION = 89 - Add `state_groups_pending_deletion` and `state_groups_persisting` tables. - -Changes in SCHEMA_VERSION = 90 - Add background update to delete unreferenced state groups. """ diff --git a/synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql b/synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql similarity index 91% rename from synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql rename to synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql index 46b09ffac3e..184dc8564c0 100644 --- a/synapse/storage/schema/state/delta/90/01_delete_unreferenced_state_groups.sql +++ b/synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql @@ -13,4 +13,4 @@ -- Add a background update to delete any unreferenced state groups INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (9001, 'delete_unreferenced_state_groups_bg_update', '{}'); + (8902, 'delete_unreferenced_state_groups_bg_update', '{}'); From 21dc067a8f2cbdc1c0063438ffecbb0c03eaff74 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 13:44:43 -0700 Subject: [PATCH 23/43] Comment ignoring table port --- synapse/_scripts/synapse_port_db.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index ff5c150fd6c..7534794f6bb 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -191,6 +191,11 @@ IGNORED_TABLES = { + # Porting the auto generated sequence in this table is non-trivial. + # And anything not ported, will get automatically added back by the + # `delete_unreferenced_state_groups_bg_update` background task. + # This makes it safe to ignore porting this table. + "state_groups_pending_deletion", # We don't port these tables, as they're a faff and we can regenerate # them anyway. "user_directory", @@ -271,7 +276,7 @@ def r(txn: LoggingTransaction) -> List[Tuple]: def insert_many_txn( self, txn: LoggingTransaction, table: str, headers: List[str], rows: List[Tuple] ) -> None: - sql = "INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE VALUES (%s)" % ( + sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, ", ".join(k for k in headers), ", ".join("%s" for _ in headers), From 09a817f1f004074cc8979d0f14e9db05b0f0a98f Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 14:37:32 -0700 Subject: [PATCH 24/43] Deduplicate find_unreferenced_groups --- synapse/storage/controllers/purge_events.py | 73 +------- synapse/storage/databases/state/bg_updates.py | 157 +++++++++--------- 2 files changed, 87 insertions(+), 143 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 47cec8c469d..de97b780d66 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -19,13 +19,13 @@ # # -import itertools import logging -from typing import TYPE_CHECKING, Collection, Mapping, Set +from typing import TYPE_CHECKING, Mapping from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.databases import Databases +from synapse.storage.databases.state.bg_updates import find_unreferenced_groups if TYPE_CHECKING: from synapse.server import HomeServer @@ -72,7 +72,12 @@ async def purge_history( ) logger.info("[purge] finding state groups that can be deleted") - sg_to_delete = await self._find_unreferenced_groups(state_groups) + sg_to_delete = await self.stores.state.db_pool.runInteraction( + "find_unreferenced_state_groups", + find_unreferenced_groups, + self.stores.state.db_pool, + state_groups, + ) # Mark these state groups as pending deletion, they will actually # get deleted automatically later. @@ -80,68 +85,6 @@ async def purge_history( sg_to_delete ) - async def _find_unreferenced_groups( - self, state_groups: Collection[int] - ) -> Set[int]: - """Used when purging history to figure out which state groups can be - deleted. - - Args: - state_groups: Set of state groups referenced by events - that are going to be deleted. - - Returns: - The set of state groups that can be deleted. - """ - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(itertools.islice(next_to_search, 100)) - next_to_search -= current_search - - referenced = await self.stores.main.get_referenced_state_groups( - current_search - ) - referenced_groups |= referenced - - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced - - edges = await self.stores.state.get_previous_state_groups(current_search) - - prevs = set(edges.values()) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - # We also check to see if anything referencing the state groups are - # also unreferenced. This helps ensure that we delete unreferenced - # state groups, if we don't then we will de-delta them when we - # delete the other state groups leading to increased DB usage. - next_edges = await self.stores.state.get_next_state_groups(current_search) - nexts = set(next_edges.keys()) - nexts -= state_groups_seen - next_to_search |= nexts - state_groups_seen |= nexts - - to_delete = state_groups_seen - referenced_groups - - return to_delete - @wrap_as_background_process("_delete_state_groups_loop") async def _delete_state_groups_loop(self) -> None: """Background task that deletes any state groups that may be pending diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index d71138b77fd..6bb15c86450 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -651,7 +651,7 @@ def cleanup_txn( # Find all state groups that can be deleted if the original set is deleted. # This set includes the original set, as well as any state groups that would # become unreferenced upon deleting the original set. - to_delete = self._find_unreferenced_groups(txn, next_set) + to_delete = find_unreferenced_groups(txn, self.db_pool, next_set) if len(to_delete) == 0: return last_checked_state_group, final_batch @@ -686,89 +686,90 @@ def cleanup_txn( batch_size, ) - def _find_unreferenced_groups( - self, txn: LoggingTransaction, state_groups: Collection[int] - ) -> Set[int]: - """Used when purging history to figure out which state groups can be - deleted. - Args: - state_groups: Set of state groups referenced by events - that are going to be deleted. +def find_unreferenced_groups( + txn: LoggingTransaction, db_pool: DatabasePool, state_groups: Collection[int] +) -> Set[int]: + """Used when purging history to figure out which state groups can be + deleted. - Returns: - The set of state groups that can be deleted. - """ - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(itertools.islice(next_to_search, 100)) - next_to_search -= current_search - - rows = self.db_pool.simple_select_many_txn( - txn, - table="event_to_state_groups", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("DISTINCT state_group",), - ) + Args: + state_groups: Set of state groups referenced by events + that are going to be deleted. - referenced = {row[0] for row in rows} - referenced_groups |= referenced + Returns: + The set of state groups that can be deleted. + """ + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + rows = db_pool.simple_select_many_txn( + txn, + table="event_to_state_groups", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("DISTINCT state_group",), + ) - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced + referenced = {row[0] for row in rows} + referenced_groups |= referenced - rows = self.db_pool.simple_select_many_txn( - txn, - table="state_group_edges", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - ) + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced - edges = dict(rows) - - prevs = set(edges.values()) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - # We also check to see if anything referencing the state groups are - # also unreferenced. This helps ensure that we delete unreferenced - # state groups, if we don't then we will de-delta them when we - # delete the other state groups leading to increased DB usage. - rows = self.db_pool.simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - ) + rows = db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + ) + + edges = dict(rows) + + prevs = set(edges.values()) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + # We also check to see if anything referencing the state groups are + # also unreferenced. This helps ensure that we delete unreferenced + # state groups, if we don't then we will de-delta them when we + # delete the other state groups leading to increased DB usage. + rows = db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + ) - next_edges = dict(rows) - nexts = set(next_edges.keys()) - nexts -= state_groups_seen - next_to_search |= nexts - state_groups_seen |= nexts + next_edges = dict(rows) + nexts = set(next_edges.keys()) + nexts -= state_groups_seen + next_to_search |= nexts + state_groups_seen |= nexts - to_delete = state_groups_seen - referenced_groups + to_delete = state_groups_seen - referenced_groups - return to_delete + return to_delete From 042af6e327026dcb1173f529035d250709403f80 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 15:39:00 -0700 Subject: [PATCH 25/43] Don't reuse variables --- synapse/storage/controllers/purge_events.py | 4 ++-- synapse/storage/databases/state/bg_updates.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index de97b780d66..4be87973d1b 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -72,10 +72,10 @@ async def purge_history( ) logger.info("[purge] finding state groups that can be deleted") - sg_to_delete = await self.stores.state.db_pool.runInteraction( + sg_to_delete = await self.stores.main.db_pool.runInteraction( "find_unreferenced_state_groups", find_unreferenced_groups, - self.stores.state.db_pool, + self.stores.main.db_pool, state_groups, ) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 6bb15c86450..f5be50510a5 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -718,7 +718,7 @@ def find_unreferenced_groups( current_search = set(itertools.islice(next_to_search, 100)) next_to_search -= current_search - rows = db_pool.simple_select_many_txn( + referenced_state_groups = db_pool.simple_select_many_txn( txn, table="event_to_state_groups", column="state_group", @@ -727,14 +727,14 @@ def find_unreferenced_groups( retcols=("DISTINCT state_group",), ) - referenced = {row[0] for row in rows} + referenced = {row[0] for row in referenced_state_groups} referenced_groups |= referenced # We don't continue iterating up the state group graphs for state # groups that are referenced. current_search -= referenced - rows = db_pool.simple_select_many_txn( + next_state_groups = db_pool.simple_select_many_txn( txn, table="state_group_edges", column="state_group", @@ -743,7 +743,7 @@ def find_unreferenced_groups( retcols=("state_group", "prev_state_group"), ) - edges = dict(rows) + edges = dict(next_state_groups) prevs = set(edges.values()) # We don't bother re-handling groups we've already seen @@ -755,7 +755,7 @@ def find_unreferenced_groups( # also unreferenced. This helps ensure that we delete unreferenced # state groups, if we don't then we will de-delta them when we # delete the other state groups leading to increased DB usage. - rows = db_pool.simple_select_many_txn( + previous_state_groups = db_pool.simple_select_many_txn( txn, table="state_group_edges", column="prev_state_group", @@ -764,7 +764,7 @@ def find_unreferenced_groups( retcols=("state_group", "prev_state_group"), ) - next_edges = dict(rows) + next_edges = dict(previous_state_groups) nexts = set(next_edges.keys()) nexts -= state_groups_seen next_to_search |= nexts From 4cae2e5f79c4202dfff2c6c663ef251d39edd3d6 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 16:20:34 -0700 Subject: [PATCH 26/43] Switch to not use single transaction --- synapse/storage/controllers/purge_events.py | 7 +- synapse/storage/databases/state/bg_updates.py | 135 +++++++++--------- 2 files changed, 72 insertions(+), 70 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 4be87973d1b..b966c09364c 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -72,11 +72,8 @@ async def purge_history( ) logger.info("[purge] finding state groups that can be deleted") - sg_to_delete = await self.stores.main.db_pool.runInteraction( - "find_unreferenced_state_groups", - find_unreferenced_groups, - self.stores.main.db_pool, - state_groups, + sg_to_delete = await find_unreferenced_groups( + self.stores.main.db_pool, state_groups ) # Mark these state groups as pending deletion, they will actually diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index f5be50510a5..afc8b67d952 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -592,10 +592,8 @@ async def _delete_unreferenced_state_groups_batch( (last_checked_state_group, final_batch) """ - def cleanup_txn( - txn: LoggingTransaction, last_checked_state_group: int, batch_size: int - ) -> Tuple[int, bool]: - # Look for state groups that can be cleaned up. + # Look for state groups that can be cleaned up. + def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]: state_group_sql = ( "SELECT id FROM state_groups WHERE id > ? ORDER BY id LIMIT ?" ) @@ -603,59 +601,68 @@ def cleanup_txn( next_set = {row[0] for row in txn} - final_batch = False - if len(next_set) < batch_size: - final_batch = True - else: - last_checked_state_group = max(next_set) + return next_set - if len(next_set) == 0: - return last_checked_state_group, final_batch + next_set = await self.db_pool.runInteraction( + "get_next_state_groups", get_next_state_groups_txn + ) - # Discard any state groups referenced directly by an event... - rows = cast( - List[Tuple[int]], - self.db_pool.simple_select_many_txn( - txn, - table="event_to_state_groups", - column="state_group", - iterable=next_set, - keyvalues={}, - retcols=("DISTINCT state_group",), - ), - ) + final_batch = False + if len(next_set) < batch_size: + final_batch = True + else: + last_checked_state_group = max(next_set) - next_set.difference_update({row[0] for row in rows}) + if len(next_set) == 0: + return last_checked_state_group, final_batch - if len(next_set) == 0: - return last_checked_state_group, final_batch + # Discard any state groups referenced directly by an event... + rows = cast( + List[Tuple[int]], + await self.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=next_set, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ), + ) - # ... and discard any that are referenced by other state groups. - rows = cast( - List[Tuple[int]], - self.db_pool.simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=next_set, - keyvalues={}, - retcols=("DISTINCT prev_state_group",), - ), - ) + next_set.difference_update({row[0] for row in rows}) - next_set.difference_update(row[0] for row in rows) + if len(next_set) == 0: + return last_checked_state_group, final_batch - if len(next_set) == 0: - return last_checked_state_group, final_batch + # ... and discard any that are referenced by other state groups. + rows = cast( + List[Tuple[int]], + await self.db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=next_set, + keyvalues={}, + retcols=("DISTINCT prev_state_group",), + desc="get_previous_state_groups", + ), + ) + + next_set.difference_update(row[0] for row in rows) - # Find all state groups that can be deleted if the original set is deleted. - # This set includes the original set, as well as any state groups that would - # become unreferenced upon deleting the original set. - to_delete = find_unreferenced_groups(txn, self.db_pool, next_set) + if len(next_set) == 0: + return last_checked_state_group, final_batch - if len(to_delete) == 0: - return last_checked_state_group, final_batch + # Find all state groups that can be deleted if the original set is deleted. + # This set includes the original set, as well as any state groups that would + # become unreferenced upon deleting the original set. + to_delete = await find_unreferenced_groups(self.db_pool, next_set) + + if len(to_delete) == 0: + return last_checked_state_group, final_batch + def mark_for_deletion_txn( + txn: LoggingTransaction, state_groups: Set[int] + ) -> None: # Mark the state groups for deletion by the deletion background task. sql = """ INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) @@ -670,25 +677,23 @@ def cleanup_txn( state_group, now, ) - for state_group in to_delete + for state_group in state_groups ] + if isinstance(txn.database_engine, PostgresEngine): txn.execute_values(sql % ("?",), deletion_rows, fetch=False) else: txn.execute_batch(sql % ("(?, ?)",), deletion_rows) - return last_checked_state_group, final_batch - - return await self.db_pool.runInteraction( - "mark_unreferenced_state_groups_for_deletion", - cleanup_txn, - last_checked_state_group, - batch_size, + await self.db_pool.runInteraction( + "mark_state_groups_for_deletion", mark_for_deletion_txn, to_delete ) + return last_checked_state_group, final_batch + -def find_unreferenced_groups( - txn: LoggingTransaction, db_pool: DatabasePool, state_groups: Collection[int] +async def find_unreferenced_groups( + db_pool: DatabasePool, state_groups: Collection[int] ) -> Set[int]: """Used when purging history to figure out which state groups can be deleted. @@ -718,13 +723,13 @@ def find_unreferenced_groups( current_search = set(itertools.islice(next_to_search, 100)) next_to_search -= current_search - referenced_state_groups = db_pool.simple_select_many_txn( - txn, + referenced_state_groups = await db_pool.simple_select_many_batch( table="event_to_state_groups", column="state_group", iterable=current_search, keyvalues={}, retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", ) referenced = {row[0] for row in referenced_state_groups} @@ -734,16 +739,16 @@ def find_unreferenced_groups( # groups that are referenced. current_search -= referenced - next_state_groups = db_pool.simple_select_many_txn( - txn, + prev_state_groups = await db_pool.simple_select_many_batch( table="state_group_edges", column="state_group", iterable=current_search, keyvalues={}, retcols=("state_group", "prev_state_group"), + desc="get_previous_state_groups", ) - edges = dict(next_state_groups) + edges = dict(prev_state_groups) prevs = set(edges.values()) # We don't bother re-handling groups we've already seen @@ -755,16 +760,16 @@ def find_unreferenced_groups( # also unreferenced. This helps ensure that we delete unreferenced # state groups, if we don't then we will de-delta them when we # delete the other state groups leading to increased DB usage. - previous_state_groups = db_pool.simple_select_many_txn( - txn, + next_state_groups = await db_pool.simple_select_many_batch( table="state_group_edges", column="prev_state_group", iterable=current_search, keyvalues={}, retcols=("state_group", "prev_state_group"), + desc="get_next_state_groups", ) - next_edges = dict(previous_state_groups) + next_edges = dict(next_state_groups) nexts = set(next_edges.keys()) nexts -= state_groups_seen next_to_search |= nexts From ae367b2b3b5ddbc4894975caa881ddbfe06bc861 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 16:41:32 -0700 Subject: [PATCH 27/43] Try casting --- synapse/storage/databases/state/bg_updates.py | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index afc8b67d952..af6551cb8ab 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -723,13 +723,16 @@ async def find_unreferenced_groups( current_search = set(itertools.islice(next_to_search, 100)) next_to_search -= current_search - referenced_state_groups = await db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", + referenced_state_groups = cast( + List[Tuple[int]], + await db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ), ) referenced = {row[0] for row in referenced_state_groups} @@ -739,13 +742,16 @@ async def find_unreferenced_groups( # groups that are referenced. current_search -= referenced - prev_state_groups = await db_pool.simple_select_many_batch( - table="state_group_edges", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - desc="get_previous_state_groups", + prev_state_groups = cast( + List[Tuple[int, int]], + await db_pool.simple_select_many_batch( + table="state_group_edges", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + desc="get_previous_state_groups", + ), ) edges = dict(prev_state_groups) @@ -760,13 +766,16 @@ async def find_unreferenced_groups( # also unreferenced. This helps ensure that we delete unreferenced # state groups, if we don't then we will de-delta them when we # delete the other state groups leading to increased DB usage. - next_state_groups = await db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - desc="get_next_state_groups", + next_state_groups = cast( + List[Tuple[int, int]], + await db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + desc="get_next_state_groups", + ), ) next_edges = dict(next_state_groups) From 977a8d8320e1fd3b17fe29fabc44d558ce2cc8bc Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 17:03:52 -0700 Subject: [PATCH 28/43] Readd duplication --- synapse/storage/controllers/purge_events.py | 105 +++++++++++++++++++- 1 file changed, 100 insertions(+), 5 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index b966c09364c..bd5d596f24b 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -19,13 +19,13 @@ # # +import itertools import logging -from typing import TYPE_CHECKING, Mapping +from typing import TYPE_CHECKING, Collection, List, Mapping, Set, Tuple, cast from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.databases import Databases -from synapse.storage.databases.state.bg_updates import find_unreferenced_groups if TYPE_CHECKING: from synapse.server import HomeServer @@ -72,9 +72,7 @@ async def purge_history( ) logger.info("[purge] finding state groups that can be deleted") - sg_to_delete = await find_unreferenced_groups( - self.stores.main.db_pool, state_groups - ) + sg_to_delete = await self._find_unreferenced_groups(state_groups) # Mark these state groups as pending deletion, they will actually # get deleted automatically later. @@ -143,3 +141,100 @@ async def _delete_state_groups( room_id, groups_to_sequences, ) + + async def _find_unreferenced_groups( + self, state_groups: Collection[int] + ) -> Set[int]: + """Used when purging history to figure out which state groups can be + deleted. + + Args: + state_groups: Set of state groups referenced by events + that are going to be deleted. + + Returns: + The set of state groups that can be deleted. + """ + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + referenced_state_groups = cast( + List[Tuple[int]], + await self.stores.main.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ), + ) + + referenced = {row[0] for row in referenced_state_groups} + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + prev_state_groups = cast( + List[Tuple[int, int]], + await self.stores.state.db_pool.simple_select_many_batch( + table="state_group_edges", + column="state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + desc="get_previous_state_groups", + ), + ) + + edges = dict(prev_state_groups) + + prevs = set(edges.values()) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + # We also check to see if anything referencing the state groups are + # also unreferenced. This helps ensure that we delete unreferenced + # state groups, if we don't then we will de-delta them when we + # delete the other state groups leading to increased DB usage. + next_state_groups = cast( + List[Tuple[int, int]], + await self.stores.state.db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + desc="get_next_state_groups", + ), + ) + + next_edges = dict(next_state_groups) + nexts = set(next_edges.keys()) + nexts -= state_groups_seen + next_to_search |= nexts + state_groups_seen |= nexts + + to_delete = state_groups_seen - referenced_groups + + return to_delete From d8f920b83b1c0d8e7887db88fa401b525952acf5 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 17:05:45 -0700 Subject: [PATCH 29/43] Put it back in place --- synapse/storage/controllers/purge_events.py | 82 ++++++++++----------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index bd5d596f24b..4758593a97b 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -101,47 +101,6 @@ async def _delete_state_groups_loop(self) -> None: if not made_progress: break - async def _delete_state_groups( - self, room_id: str, groups_to_sequences: Mapping[int, int] - ) -> bool: - """Tries to delete the given state groups. - - Returns: - Whether we made progress in deleting the state groups (or marking - them as referenced). - """ - - # We double check if any of the state groups have become referenced. - # This shouldn't happen, as any usages should cause the state group to - # be removed as pending deletion. - referenced_state_groups = await self.stores.main.get_referenced_state_groups( - groups_to_sequences - ) - - if referenced_state_groups: - # We mark any state groups that have become referenced as being - # used. - await self.stores.state_deletion.mark_state_groups_as_used( - referenced_state_groups - ) - - # Update list of state groups to remove referenced ones - groups_to_sequences = { - state_group: sequence_number - for state_group, sequence_number in groups_to_sequences.items() - if state_group not in referenced_state_groups - } - - if not groups_to_sequences: - # We made progress here as long as we marked some state groups as - # now referenced. - return len(referenced_state_groups) > 0 - - return await self.stores.state.purge_unreferenced_state_groups( - room_id, - groups_to_sequences, - ) - async def _find_unreferenced_groups( self, state_groups: Collection[int] ) -> Set[int]: @@ -238,3 +197,44 @@ async def _find_unreferenced_groups( to_delete = state_groups_seen - referenced_groups return to_delete + + async def _delete_state_groups( + self, room_id: str, groups_to_sequences: Mapping[int, int] + ) -> bool: + """Tries to delete the given state groups. + + Returns: + Whether we made progress in deleting the state groups (or marking + them as referenced). + """ + + # We double check if any of the state groups have become referenced. + # This shouldn't happen, as any usages should cause the state group to + # be removed as pending deletion. + referenced_state_groups = await self.stores.main.get_referenced_state_groups( + groups_to_sequences + ) + + if referenced_state_groups: + # We mark any state groups that have become referenced as being + # used. + await self.stores.state_deletion.mark_state_groups_as_used( + referenced_state_groups + ) + + # Update list of state groups to remove referenced ones + groups_to_sequences = { + state_group: sequence_number + for state_group, sequence_number in groups_to_sequences.items() + if state_group not in referenced_state_groups + } + + if not groups_to_sequences: + # We made progress here as long as we marked some state groups as + # now referenced. + return len(referenced_state_groups) > 0 + + return await self.stores.state.purge_unreferenced_state_groups( + room_id, + groups_to_sequences, + ) From 6582fedddda34c8077b20557c5edda96b409691b Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 17:06:48 -0700 Subject: [PATCH 30/43] Put it back in place --- synapse/storage/controllers/purge_events.py | 43 ++++++++++----------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 4758593a97b..a845ae9b744 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -79,28 +79,6 @@ async def purge_history( await self.stores.state_deletion.mark_state_groups_as_pending_deletion( sg_to_delete ) - - @wrap_as_background_process("_delete_state_groups_loop") - async def _delete_state_groups_loop(self) -> None: - """Background task that deletes any state groups that may be pending - deletion.""" - - while True: - next_to_delete = await self.stores.state_deletion.get_next_state_group_collection_to_delete() - if next_to_delete is None: - break - - (room_id, groups_to_sequences) = next_to_delete - made_progress = await self._delete_state_groups( - room_id, groups_to_sequences - ) - - # If no progress was made in deleting the state groups, then we - # break to allow a pause before trying again next time we get - # called. - if not made_progress: - break - async def _find_unreferenced_groups( self, state_groups: Collection[int] ) -> Set[int]: @@ -198,6 +176,27 @@ async def _find_unreferenced_groups( return to_delete + @wrap_as_background_process("_delete_state_groups_loop") + async def _delete_state_groups_loop(self) -> None: + """Background task that deletes any state groups that may be pending + deletion.""" + + while True: + next_to_delete = await self.stores.state_deletion.get_next_state_group_collection_to_delete() + if next_to_delete is None: + break + + (room_id, groups_to_sequences) = next_to_delete + made_progress = await self._delete_state_groups( + room_id, groups_to_sequences + ) + + # If no progress was made in deleting the state groups, then we + # break to allow a pause before trying again next time we get + # called. + if not made_progress: + break + async def _delete_state_groups( self, room_id: str, groups_to_sequences: Mapping[int, int] ) -> bool: From ecb8ed50fcfb2323dd49c0d3d692aab25132e4a6 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 17:09:01 -0700 Subject: [PATCH 31/43] Fix linter error --- synapse/storage/controllers/purge_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index a845ae9b744..fde202be9fd 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -79,6 +79,7 @@ async def purge_history( await self.stores.state_deletion.mark_state_groups_as_pending_deletion( sg_to_delete ) + async def _find_unreferenced_groups( self, state_groups: Collection[int] ) -> Set[int]: From 8eae7dd751756dcf9b28c2d7a2ec0640c9d3fbf8 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 17:30:24 -0700 Subject: [PATCH 32/43] Use multiple db pools --- synapse/storage/controllers/purge_events.py | 8 +++++++- synapse/storage/databases/state/bg_updates.py | 12 +++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index fde202be9fd..9e3866ba6e8 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -26,6 +26,7 @@ from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.databases import Databases +from synapse.storage.databases.state.bg_updates import find_unreferenced_groups if TYPE_CHECKING: from synapse.server import HomeServer @@ -72,7 +73,12 @@ async def purge_history( ) logger.info("[purge] finding state groups that can be deleted") - sg_to_delete = await self._find_unreferenced_groups(state_groups) + # sg_to_delete = await self._find_unreferenced_groups(state_groups) + sg_to_delete = await find_unreferenced_groups( + self.stores.main.db_pool, + self.stores.state.db_pool, + state_groups, + ) # Mark these state groups as pending deletion, they will actually # get deleted automatically later. diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index af6551cb8ab..515f511bdf0 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -655,7 +655,7 @@ def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]: # Find all state groups that can be deleted if the original set is deleted. # This set includes the original set, as well as any state groups that would # become unreferenced upon deleting the original set. - to_delete = await find_unreferenced_groups(self.db_pool, next_set) + to_delete = await find_unreferenced_groups(self.db_pool, self.db_pool, next_set) if len(to_delete) == 0: return last_checked_state_group, final_batch @@ -693,7 +693,9 @@ def mark_for_deletion_txn( async def find_unreferenced_groups( - db_pool: DatabasePool, state_groups: Collection[int] + main_db_pool: DatabasePool, + state_db_pool: DatabasePool, + state_groups: Collection[int], ) -> Set[int]: """Used when purging history to figure out which state groups can be deleted. @@ -725,7 +727,7 @@ async def find_unreferenced_groups( referenced_state_groups = cast( List[Tuple[int]], - await db_pool.simple_select_many_batch( + await main_db_pool.simple_select_many_batch( table="event_to_state_groups", column="state_group", iterable=current_search, @@ -744,7 +746,7 @@ async def find_unreferenced_groups( prev_state_groups = cast( List[Tuple[int, int]], - await db_pool.simple_select_many_batch( + await state_db_pool.simple_select_many_batch( table="state_group_edges", column="state_group", iterable=current_search, @@ -768,7 +770,7 @@ async def find_unreferenced_groups( # delete the other state groups leading to increased DB usage. next_state_groups = cast( List[Tuple[int, int]], - await db_pool.simple_select_many_batch( + await state_db_pool.simple_select_many_batch( table="state_group_edges", column="prev_state_group", iterable=current_search, From 8ef4a238f5b789b46c3ed19e955f6c44c0a71f02 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 18 Feb 2025 17:48:08 -0700 Subject: [PATCH 33/43] Remove duplication again --- synapse/storage/controllers/purge_events.py | 100 +------------------- 1 file changed, 1 insertion(+), 99 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 9e3866ba6e8..71ef5b793c7 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -19,9 +19,8 @@ # # -import itertools import logging -from typing import TYPE_CHECKING, Collection, List, Mapping, Set, Tuple, cast +from typing import TYPE_CHECKING, Mapping from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -86,103 +85,6 @@ async def purge_history( sg_to_delete ) - async def _find_unreferenced_groups( - self, state_groups: Collection[int] - ) -> Set[int]: - """Used when purging history to figure out which state groups can be - deleted. - - Args: - state_groups: Set of state groups referenced by events - that are going to be deleted. - - Returns: - The set of state groups that can be deleted. - """ - - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(itertools.islice(next_to_search, 100)) - next_to_search -= current_search - - referenced_state_groups = cast( - List[Tuple[int]], - await self.stores.main.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", - ), - ) - - referenced = {row[0] for row in referenced_state_groups} - referenced_groups |= referenced - - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced - - prev_state_groups = cast( - List[Tuple[int, int]], - await self.stores.state.db_pool.simple_select_many_batch( - table="state_group_edges", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - desc="get_previous_state_groups", - ), - ) - - edges = dict(prev_state_groups) - - prevs = set(edges.values()) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - # We also check to see if anything referencing the state groups are - # also unreferenced. This helps ensure that we delete unreferenced - # state groups, if we don't then we will de-delta them when we - # delete the other state groups leading to increased DB usage. - next_state_groups = cast( - List[Tuple[int, int]], - await self.stores.state.db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - desc="get_next_state_groups", - ), - ) - - next_edges = dict(next_state_groups) - nexts = set(next_edges.keys()) - nexts -= state_groups_seen - next_to_search |= nexts - state_groups_seen |= nexts - - to_delete = state_groups_seen - referenced_groups - - return to_delete - @wrap_as_background_process("_delete_state_groups_loop") async def _delete_state_groups_loop(self) -> None: """Background task that deletes any state groups that may be pending From dfa55a9b7afc9dbbef7511abe44c2e5a6cd57315 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Feb 2025 12:26:32 -0700 Subject: [PATCH 34/43] Lift logic to purge events controller --- synapse/storage/controllers/purge_events.py | 213 ++++++++++++++- synapse/storage/databases/state/bg_updates.py | 257 ------------------ 2 files changed, 205 insertions(+), 265 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 71ef5b793c7..d8bf22edf14 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -19,13 +19,23 @@ # # +import itertools import logging -from typing import TYPE_CHECKING, Mapping +from typing import ( + TYPE_CHECKING, + Collection, + List, + Mapping, + Set, + Tuple, + cast, +) from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.storage.database import LoggingTransaction from synapse.storage.databases import Databases -from synapse.storage.databases.state.bg_updates import find_unreferenced_groups +from synapse.types.storage import _BackgroundUpdates if TYPE_CHECKING: from synapse.server import HomeServer @@ -44,6 +54,11 @@ def __init__(self, hs: "HomeServer", stores: Databases): self._delete_state_groups_loop, 60 * 1000 ) + self.stores.state.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, + self._background_delete_unrefereneced_state_groups, + ) + async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" @@ -72,12 +87,7 @@ async def purge_history( ) logger.info("[purge] finding state groups that can be deleted") - # sg_to_delete = await self._find_unreferenced_groups(state_groups) - sg_to_delete = await find_unreferenced_groups( - self.stores.main.db_pool, - self.stores.state.db_pool, - state_groups, - ) + sg_to_delete = await self._find_unreferenced_groups(state_groups) # Mark these state groups as pending deletion, they will actually # get deleted automatically later. @@ -146,3 +156,190 @@ async def _delete_state_groups( room_id, groups_to_sequences, ) + + async def _background_delete_unrefereneced_state_groups( + self, progress: dict, batch_size: int + ) -> int: + """This background update will slowly delete any unreferenced state groups""" + + last_checked_state_group = progress.get("last_checked_state_group") + + if last_checked_state_group is None: + # This is the first run. + last_checked_state_group = 0 + + ( + last_checked_state_group, + final_batch, + ) = await self._delete_unreferenced_state_groups_batch( + last_checked_state_group, batch_size + ) + + if not final_batch: + # There are more state groups to check. + progress = { + "last_checked_state_group": last_checked_state_group, + } + await self.stores.state.db_pool.updates._background_update_progress( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, + progress, + ) + else: + # This background process is finished. + await self.stores.state.db_pool.updates._end_background_update( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE + ) + + return batch_size + + async def _delete_unreferenced_state_groups_batch( + self, last_checked_state_group: int, batch_size: int + ) -> tuple[int, bool]: + """Looks for unreferenced state groups starting from the last state group + checked, and any state groups which would become unreferenced if a state group + was deleted, and marks them for deletion. + + Args: + last_checked_state_group: The last state group that was checked. + batch_size: How many state groups to process in this iteration. + + Returns: + (last_checked_state_group, final_batch) + """ + + # Look for state groups that can be cleaned up. + def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]: + state_group_sql = ( + "SELECT id FROM state_groups WHERE id > ? ORDER BY id LIMIT ?" + ) + txn.execute(state_group_sql, (last_checked_state_group, batch_size)) + + next_set = {row[0] for row in txn} + + return next_set + + next_set = await self.stores.state.db_pool.runInteraction( + "get_next_state_groups", get_next_state_groups_txn + ) + + final_batch = False + if len(next_set) < batch_size: + final_batch = True + else: + last_checked_state_group = max(next_set) + + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # Discard any state groups referenced directly by an event... + rows = cast( + List[Tuple[int]], + await self.stores.main.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=next_set, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ), + ) + + next_set.difference_update({row[0] for row in rows}) + + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # ... and discard any that are referenced by other state groups. + rows = cast( + List[Tuple[int]], + await self.stores.state.db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=next_set, + keyvalues={}, + retcols=("DISTINCT prev_state_group",), + desc="get_previous_state_groups", + ), + ) + + next_set.difference_update(row[0] for row in rows) + + if len(next_set) == 0: + return last_checked_state_group, final_batch + + # Find all state groups that can be deleted if the original set is deleted. + # This set includes the original set, as well as any state groups that would + # become unreferenced upon deleting the original set. + to_delete = await self._find_unreferenced_groups(next_set) + + if len(to_delete) == 0: + return last_checked_state_group, final_batch + + await self.stores.state_deletion.mark_state_groups_as_pending_deletion( + to_delete + ) + + return last_checked_state_group, final_batch + + async def _find_unreferenced_groups( + self, + state_groups: Collection[int], + ) -> Set[int]: + """Used when purging history to figure out which state groups can be + deleted. + + Args: + state_groups: Set of state groups referenced by events + that are going to be deleted. + + Returns: + The set of state groups that can be deleted. + """ + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + referenced = await self.stores.main.get_referenced_state_groups( + current_search + ) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + edges = await self.stores.state.get_previous_state_groups(current_search) + + prevs = set(edges.values()) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + # We also check to see if anything referencing the state groups are + # also unreferenced. This helps ensure that we delete unreferenced + # state groups, if we don't then we will de-delta them when we + # delete the other state groups leading to increased DB usage. + next_edges = await self.stores.state.get_next_state_groups(current_search) + nexts = set(next_edges.keys()) + nexts -= state_groups_seen + next_to_search |= nexts + state_groups_seen |= nexts + + to_delete = state_groups_seen - referenced_groups + + return to_delete diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 515f511bdf0..95fd0ae73ae 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -19,19 +19,15 @@ # # -import itertools import logging from typing import ( TYPE_CHECKING, - Collection, Dict, List, Mapping, Optional, - Set, Tuple, Union, - cast, ) from synapse.logging.opentracing import tag_args, trace @@ -44,7 +40,6 @@ from synapse.storage.engines import PostgresEngine from synapse.types import MutableStateMap, StateMap from synapse.types.state import StateFilter -from synapse.types.storage import _BackgroundUpdates from synapse.util.caches import intern_string if TYPE_CHECKING: @@ -361,10 +356,6 @@ def __init__( table="local_current_membership", columns=["event_stream_ordering"], ) - self.db_pool.updates.register_background_update_handler( - _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, - self._background_delete_unrefereneced_state_groups, - ) async def _background_deduplicate_state( self, progress: dict, batch_size: int @@ -541,251 +532,3 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None: ) return 1 - - async def _background_delete_unrefereneced_state_groups( - self, progress: dict, batch_size: int - ) -> int: - """This background update will slowly delete any unreferenced state groups""" - - last_checked_state_group = progress.get("last_checked_state_group") - - if last_checked_state_group is None: - # This is the first run. - last_checked_state_group = 0 - - ( - last_checked_state_group, - final_batch, - ) = await self._delete_unreferenced_state_groups_batch( - last_checked_state_group, batch_size - ) - - if not final_batch: - # There are more state groups to check. - progress = { - "last_checked_state_group": last_checked_state_group, - } - await self.db_pool.updates._background_update_progress( - _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, - progress, - ) - else: - # This background process is finished. - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE - ) - - return batch_size - - async def _delete_unreferenced_state_groups_batch( - self, last_checked_state_group: int, batch_size: int - ) -> tuple[int, bool]: - """Looks for unreferenced state groups starting from the last state group - checked, and any state groups which would become unreferenced if a state group - was deleted, and marks them for deletion. - - Args: - last_checked_state_group: The last state group that was checked. - batch_size: How many state groups to process in this iteration. - - Returns: - (last_checked_state_group, final_batch) - """ - - # Look for state groups that can be cleaned up. - def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]: - state_group_sql = ( - "SELECT id FROM state_groups WHERE id > ? ORDER BY id LIMIT ?" - ) - txn.execute(state_group_sql, (last_checked_state_group, batch_size)) - - next_set = {row[0] for row in txn} - - return next_set - - next_set = await self.db_pool.runInteraction( - "get_next_state_groups", get_next_state_groups_txn - ) - - final_batch = False - if len(next_set) < batch_size: - final_batch = True - else: - last_checked_state_group = max(next_set) - - if len(next_set) == 0: - return last_checked_state_group, final_batch - - # Discard any state groups referenced directly by an event... - rows = cast( - List[Tuple[int]], - await self.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=next_set, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", - ), - ) - - next_set.difference_update({row[0] for row in rows}) - - if len(next_set) == 0: - return last_checked_state_group, final_batch - - # ... and discard any that are referenced by other state groups. - rows = cast( - List[Tuple[int]], - await self.db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=next_set, - keyvalues={}, - retcols=("DISTINCT prev_state_group",), - desc="get_previous_state_groups", - ), - ) - - next_set.difference_update(row[0] for row in rows) - - if len(next_set) == 0: - return last_checked_state_group, final_batch - - # Find all state groups that can be deleted if the original set is deleted. - # This set includes the original set, as well as any state groups that would - # become unreferenced upon deleting the original set. - to_delete = await find_unreferenced_groups(self.db_pool, self.db_pool, next_set) - - if len(to_delete) == 0: - return last_checked_state_group, final_batch - - def mark_for_deletion_txn( - txn: LoggingTransaction, state_groups: Set[int] - ) -> None: - # Mark the state groups for deletion by the deletion background task. - sql = """ - INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) - VALUES %s - ON CONFLICT (state_group) - DO NOTHING - """ - - now = self._clock.time_msec() - deletion_rows = [ - ( - state_group, - now, - ) - for state_group in state_groups - ] - - if isinstance(txn.database_engine, PostgresEngine): - txn.execute_values(sql % ("?",), deletion_rows, fetch=False) - else: - txn.execute_batch(sql % ("(?, ?)",), deletion_rows) - - await self.db_pool.runInteraction( - "mark_state_groups_for_deletion", mark_for_deletion_txn, to_delete - ) - - return last_checked_state_group, final_batch - - -async def find_unreferenced_groups( - main_db_pool: DatabasePool, - state_db_pool: DatabasePool, - state_groups: Collection[int], -) -> Set[int]: - """Used when purging history to figure out which state groups can be - deleted. - - Args: - state_groups: Set of state groups referenced by events - that are going to be deleted. - - Returns: - The set of state groups that can be deleted. - """ - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(itertools.islice(next_to_search, 100)) - next_to_search -= current_search - - referenced_state_groups = cast( - List[Tuple[int]], - await main_db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", - ), - ) - - referenced = {row[0] for row in referenced_state_groups} - referenced_groups |= referenced - - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced - - prev_state_groups = cast( - List[Tuple[int, int]], - await state_db_pool.simple_select_many_batch( - table="state_group_edges", - column="state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - desc="get_previous_state_groups", - ), - ) - - edges = dict(prev_state_groups) - - prevs = set(edges.values()) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - # We also check to see if anything referencing the state groups are - # also unreferenced. This helps ensure that we delete unreferenced - # state groups, if we don't then we will de-delta them when we - # delete the other state groups leading to increased DB usage. - next_state_groups = cast( - List[Tuple[int, int]], - await state_db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("state_group", "prev_state_group"), - desc="get_next_state_groups", - ), - ) - - next_edges = dict(next_state_groups) - nexts = set(next_edges.keys()) - nexts -= state_groups_seen - next_to_search |= nexts - state_groups_seen |= nexts - - to_delete = state_groups_seen - referenced_groups - - return to_delete From 02c2c87639aabf3044c5339780e0be55c72965d3 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Feb 2025 13:47:19 -0700 Subject: [PATCH 35/43] Add IGNORED_BACKGROUND_UPDATES to port_db --- synapse/_scripts/synapse_port_db.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 7534794f6bb..bc54e197612 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -221,6 +221,15 @@ } +# These background updates will not be applied upon creation of the postgres database. +IGNORED_BACKGROUND_UPDATES = { + # Reapplying this background update to the postgres database is unnecessary after + # already having waited for the SQLite database to complete all running background + # updates. + "delete_unreferenced_state_groups_bg_update", +} + + # Error returned by the run function. Used at the top-level part of the script to # handle errors and return codes. end_error: Optional[str] = None @@ -692,6 +701,20 @@ def _is_sqlite_autovacuum_enabled(txn: LoggingTransaction) -> bool: # 0 means off. 1 means full. 2 means incremental. return autovacuum_setting != 0 + async def remove_ignored_background_updates_from_database(self) -> None: + def _remove_delete_unreferenced_state_groups_bg_update( + txn: LoggingTransaction, + ) -> None: + txn.execute( + "DELETE FROM background_updates WHERE update_name IN (%s)", + ", ".join(u for u in IGNORED_BACKGROUND_UPDATES), + ) + + await self.postgres_store.db_pool.runInteraction( + "remove_delete_unreferenced_state_groups_bg_update", + _remove_delete_unreferenced_state_groups_bg_update, + ) + async def run(self) -> None: """Ports the SQLite database to a PostgreSQL database. @@ -737,6 +760,8 @@ async def run(self) -> None: self.hs_config.database.get_single_database() ) + await self.remove_ignored_background_updates_from_database() + await self.run_background_updates_on_postgres() self.progress.set_state("Creating port tables") From 6851eaa5328c604131eee5fa44da4c6f4a7a083b Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Feb 2025 13:54:12 -0700 Subject: [PATCH 36/43] Fix error --- synapse/_scripts/synapse_port_db.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index bc54e197612..d8f1a41e1c6 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -702,17 +702,17 @@ def _is_sqlite_autovacuum_enabled(txn: LoggingTransaction) -> bool: return autovacuum_setting != 0 async def remove_ignored_background_updates_from_database(self) -> None: - def _remove_delete_unreferenced_state_groups_bg_update( + def _remove_delete_unreferenced_state_groups_bg_updates( txn: LoggingTransaction, ) -> None: txn.execute( - "DELETE FROM background_updates WHERE update_name IN (%s)", - ", ".join(u for u in IGNORED_BACKGROUND_UPDATES), + "DELETE FROM background_updates WHERE update_name IN (%s)" + % (", ".join(u for u in IGNORED_BACKGROUND_UPDATES),) ) await self.postgres_store.db_pool.runInteraction( - "remove_delete_unreferenced_state_groups_bg_update", - _remove_delete_unreferenced_state_groups_bg_update, + "remove_delete_unreferenced_state_groups_bg_updates", + _remove_delete_unreferenced_state_groups_bg_updates, ) async def run(self) -> None: From d1ca8c706272189ecbebe37ff74ef46795e06b7f Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Feb 2025 13:58:19 -0700 Subject: [PATCH 37/43] Update comment on ignoring state_groups_pending_deletion --- synapse/_scripts/synapse_port_db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index d8f1a41e1c6..aac27827b8c 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -192,9 +192,9 @@ IGNORED_TABLES = { # Porting the auto generated sequence in this table is non-trivial. - # And anything not ported, will get automatically added back by the - # `delete_unreferenced_state_groups_bg_update` background task. - # This makes it safe to ignore porting this table. + # None of the entries in this list are mandatory for Synapse to keep working. + # If state group disk space is an issue after the port, the + # `delete_unreferenced_state_groups_bg_update` background task can be run again. "state_groups_pending_deletion", # We don't port these tables, as they're a faff and we can regenerate # them anyway. From 0f7c874796c6aa3987dee5b6e144e0dd9f6885d1 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Feb 2025 14:04:01 -0700 Subject: [PATCH 38/43] Try different sql --- synapse/_scripts/synapse_port_db.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index aac27827b8c..87594d3f98b 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -193,7 +193,7 @@ IGNORED_TABLES = { # Porting the auto generated sequence in this table is non-trivial. # None of the entries in this list are mandatory for Synapse to keep working. - # If state group disk space is an issue after the port, the + # If state group disk space is an issue after the port, the # `delete_unreferenced_state_groups_bg_update` background task can be run again. "state_groups_pending_deletion", # We don't port these tables, as they're a faff and we can regenerate @@ -707,12 +707,13 @@ def _remove_delete_unreferenced_state_groups_bg_updates( ) -> None: txn.execute( "DELETE FROM background_updates WHERE update_name IN (%s)" - % (", ".join(u for u in IGNORED_BACKGROUND_UPDATES),) + % (", ".join("%s" for _ in IGNORED_BACKGROUND_UPDATES),) ) await self.postgres_store.db_pool.runInteraction( "remove_delete_unreferenced_state_groups_bg_updates", _remove_delete_unreferenced_state_groups_bg_updates, + IGNORED_BACKGROUND_UPDATES, ) async def run(self) -> None: From 35f15e758960be3ae6bad3693481ebe9bb39f2f5 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Feb 2025 14:12:12 -0700 Subject: [PATCH 39/43] Fixes --- synapse/_scripts/synapse_port_db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 87594d3f98b..ca01fc49205 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -707,13 +707,13 @@ def _remove_delete_unreferenced_state_groups_bg_updates( ) -> None: txn.execute( "DELETE FROM background_updates WHERE update_name IN (%s)" - % (", ".join("%s" for _ in IGNORED_BACKGROUND_UPDATES),) + % (", ".join("%s" for _ in IGNORED_BACKGROUND_UPDATES),), + list(IGNORED_BACKGROUND_UPDATES), ) await self.postgres_store.db_pool.runInteraction( "remove_delete_unreferenced_state_groups_bg_updates", _remove_delete_unreferenced_state_groups_bg_updates, - IGNORED_BACKGROUND_UPDATES, ) async def run(self) -> None: From 89ec2a33a244646fbfeb138e4d016aef12cf8cdc Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Mon, 24 Feb 2025 17:46:08 +0000 Subject: [PATCH 40/43] Update synapse/_scripts/synapse_port_db.py Co-authored-by: Erik Johnston --- synapse/_scripts/synapse_port_db.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index ca01fc49205..094051c5add 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -706,9 +706,8 @@ def _remove_delete_unreferenced_state_groups_bg_updates( txn: LoggingTransaction, ) -> None: txn.execute( - "DELETE FROM background_updates WHERE update_name IN (%s)" - % (", ".join("%s" for _ in IGNORED_BACKGROUND_UPDATES),), - list(IGNORED_BACKGROUND_UPDATES), + "DELETE FROM background_updates WHERE update_name = ANY(?)" + (list(IGNORED_BACKGROUND_UPDATES),), ) await self.postgres_store.db_pool.runInteraction( From f5e59f2329ca8b98aa171da85a5d92cadfeadd39 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Mon, 24 Feb 2025 11:04:50 -0700 Subject: [PATCH 41/43] Fix port_db syntax --- synapse/_scripts/synapse_port_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 094051c5add..59065a05040 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -706,7 +706,7 @@ def _remove_delete_unreferenced_state_groups_bg_updates( txn: LoggingTransaction, ) -> None: txn.execute( - "DELETE FROM background_updates WHERE update_name = ANY(?)" + "DELETE FROM background_updates WHERE update_name = ANY(?)", (list(IGNORED_BACKGROUND_UPDATES),), ) From 92d459da8b88a7945303dcc57fa4b6689ff94a11 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Mon, 24 Feb 2025 12:03:16 -0700 Subject: [PATCH 42/43] Remove unnecessary code --- synapse/storage/controllers/purge_events.py | 39 --------------------- tests/storage/test_purge.py | 2 +- 2 files changed, 1 insertion(+), 40 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index d8bf22edf14..d6c6607a501 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -24,11 +24,8 @@ from typing import ( TYPE_CHECKING, Collection, - List, Mapping, Set, - Tuple, - cast, ) from synapse.logging.context import nested_logging_context @@ -228,42 +225,6 @@ def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]: else: last_checked_state_group = max(next_set) - if len(next_set) == 0: - return last_checked_state_group, final_batch - - # Discard any state groups referenced directly by an event... - rows = cast( - List[Tuple[int]], - await self.stores.main.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=next_set, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", - ), - ) - - next_set.difference_update({row[0] for row in rows}) - - if len(next_set) == 0: - return last_checked_state_group, final_batch - - # ... and discard any that are referenced by other state groups. - rows = cast( - List[Tuple[int]], - await self.stores.state.db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=next_set, - keyvalues={}, - retcols=("DISTINCT prev_state_group",), - desc="get_previous_state_groups", - ), - ) - - next_set.difference_update(row[0] for row in rows) - if len(next_set) == 0: return last_checked_state_group, final_batch diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 7ad0c2e932d..7cf6a323866 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -366,4 +366,4 @@ def test_clear_unreferenced_state_groups(self) -> None: desc="test_purge_unreferenced_state_group", ) ) - self.assertEqual(len(state_groups), 208) + self.assertEqual(len(state_groups), 207) From 5f5f09004f2c9e9cd4f7b5394e5065949279c90a Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Mon, 24 Feb 2025 13:56:23 -0700 Subject: [PATCH 43/43] Only clear state groups up to max from first iteration --- synapse/storage/controllers/purge_events.py | 32 +++++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index d6c6607a501..66af8b39321 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -160,22 +160,38 @@ async def _background_delete_unrefereneced_state_groups( """This background update will slowly delete any unreferenced state groups""" last_checked_state_group = progress.get("last_checked_state_group") + max_state_group = progress.get("max_state_group") - if last_checked_state_group is None: + if last_checked_state_group is None or max_state_group is None: # This is the first run. last_checked_state_group = 0 + max_state_group = await self.stores.state.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={}, + retcol="MAX(id)", + allow_none=True, + desc="get_max_state_group", + ) + if max_state_group is None: + # There are no state groups so the background process is finished. + await self.stores.state.db_pool.updates._end_background_update( + _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE + ) + return batch_size + ( last_checked_state_group, final_batch, ) = await self._delete_unreferenced_state_groups_batch( - last_checked_state_group, batch_size + last_checked_state_group, batch_size, max_state_group ) if not final_batch: # There are more state groups to check. progress = { "last_checked_state_group": last_checked_state_group, + "max_state_group": max_state_group, } await self.stores.state.db_pool.updates._background_update_progress( _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE, @@ -190,7 +206,10 @@ async def _background_delete_unrefereneced_state_groups( return batch_size async def _delete_unreferenced_state_groups_batch( - self, last_checked_state_group: int, batch_size: int + self, + last_checked_state_group: int, + batch_size: int, + max_state_group: int, ) -> tuple[int, bool]: """Looks for unreferenced state groups starting from the last state group checked, and any state groups which would become unreferenced if a state group @@ -206,12 +225,13 @@ async def _delete_unreferenced_state_groups_batch( # Look for state groups that can be cleaned up. def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]: - state_group_sql = ( - "SELECT id FROM state_groups WHERE id > ? ORDER BY id LIMIT ?" + state_group_sql = "SELECT id FROM state_groups WHERE ? < id AND id <= ? ORDER BY id LIMIT ?" + txn.execute( + state_group_sql, (last_checked_state_group, max_state_group, batch_size) ) - txn.execute(state_group_sql, (last_checked_state_group, batch_size)) next_set = {row[0] for row in txn} + logger.error("Next: %s", next_set) return next_set