diff --git a/deploy/bin/sanitise_backup.py b/deploy/bin/sanitise_backup.py index df7a191e..b86a0a2a 100644 --- a/deploy/bin/sanitise_backup.py +++ b/deploy/bin/sanitise_backup.py @@ -34,18 +34,22 @@ def main(backup_file): cur = conn.execute("SELECT name FROM sqlite_schema WHERE type ='table' AND name NOT LIKE 'sqlite_%';") tables = [t[0] for t in cur.fetchall()] + # Find tables that reference user table references_user = [] for table in tables: cur = conn.execute(f"PRAGMA foreign_key_list('{table}')") fks = cur.fetchall() references_user.extend([(fk[3],table) for fk in fks if fk[2]=="opencodelists_user" and fk[4]=="username"]) + # Sanitise user data and related cur = conn.execute("select username from opencodelists_user;") usernames = [row[0] for row in cur.fetchall()] fake_usernames = [] for username in usernames: + # "deferrable" FKs means we can break integrity within a transaction stmts=["BEGIN TRANSACTION"] + # Generate fake username that's not a real username or one we've already had while True: fake_username = fake.user_name() if fake_username not in fake_usernames and fake_username not in usernames: @@ -55,9 +59,11 @@ def main(backup_file): fake_email = f"{username}@example.com" fake_password = hash_password(fake.password()) + # Update FK fields (must be done first) for user_column, table in references_user: stmts.append(f"UPDATE {table} SET {user_column} = '{fake_username}' WHERE {user_column} = '{username}'") + # Update user fields containing personal data stmts.append(f""" UPDATE opencodelists_user SET username = '{fake_username}', @@ -71,15 +77,25 @@ def main(backup_file): conn.executescript(";\n".join(stmts)+";\n") + # Sanitise freetext fields cur = conn.execute("SELECT name, sql FROM sqlite_schema WHERE type ='table' AND name NOT LIKE 'sqlite_%' AND name NOT LIKE 'django_%' AND name NOT LIKE 'versioning%' AND sql LIKE '% text %';") freetext_tables = [(t[0],extract_text_column_names(t[1])) for t in cur.fetchall()] for table, columns in freetext_tables: for column in columns: + # Leave csv and other non-user-provided form data intact if "data" in column: continue stmt = f"UPDATE {table} SET {column} = '[freetext removed]' WHERE COALESCE({column},'') <> '';" conn.execute(stmt) + # Replace API keys + cur = conn.execute("SELECT key FROM authtoken_token;") + keys = [k[0] for k in cur.fetchall()] + for key in keys: + conn.execute( + f"UPDATE authtoken_token SET key = '{secrets.token_hex(20)}' WHERE key = '{key}';" + ) + conn.commit() conn.close() diff --git a/opencodelists/tests/integration/test_sanitise_backup.py b/opencodelists/tests/integration/test_sanitise_backup.py index eab38d0e..7b748faf 100644 --- a/opencodelists/tests/integration/test_sanitise_backup.py +++ b/opencodelists/tests/integration/test_sanitise_backup.py @@ -8,6 +8,21 @@ @pytest.mark.django_db(transaction=True) class TestBackupSanitisation: + def test_api_keys_removed(self, universe, tmp_path): + original_users = [v for v in universe.values() if isinstance(v, User)] + api_tokens = {u.api_token for u in original_users if u.api_token} + + backup_path = backup_db(tmp_path) + sanitise_backup(backup_path) + + conn = sqlite3.connect(backup_path) + cur = conn.execute("SELECT key FROM authtoken_token;") + sanitised_tokens = {k[0] for k in cur.fetchall()} + + assert len(sanitised_tokens) == len(api_tokens) + + assert api_tokens != sanitised_tokens + def test_user_fields_sanitised(self, universe, tmp_path): personal_data_fields = ["username", "email", "name", "password"] original_users = [v for v in universe.values() if isinstance(v, User)]