Skip to content

Refactor dumps code #3278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
529 changes: 208 additions & 321 deletions listenbrainz/dumps/exporter.py

Large diffs are not rendered by default.

28 changes: 17 additions & 11 deletions listenbrainz/dumps/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,33 @@

import sqlalchemy
from flask import current_app
from psycopg2.sql import SQL
from psycopg2.sql import Identifier

from listenbrainz import db
from listenbrainz.db import timescale
from listenbrainz.dumps import DUMP_DEFAULT_THREAD_COUNT, SCHEMA_VERSION_CORE
from listenbrainz.dumps.exceptions import SchemaMismatchException
from listenbrainz.dumps.models import DumpTablesCollection
from listenbrainz.dumps.tables import PRIVATE_TABLES, PRIVATE_TABLES_TIMESCALE, PUBLIC_TABLES_IMPORT, \
PUBLIC_TABLES_TIMESCALE_DUMP, _escape_table_columns
PUBLIC_TABLES_TIMESCALE_DUMP


def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
tables, schema_version: int, threads=DUMP_DEFAULT_THREAD_COUNT):
tables_collection: DumpTablesCollection, schema_version: int, threads=DUMP_DEFAULT_THREAD_COUNT):
""" Import dump present in passed archive path into postgres db.

Arguments:
archive_path: path to the .tar.zst archive to be imported
db_engine: an sqlalchemy Engine instance for making a connection
tables: dict of tables present in the archive with table name as key and
tables_collection: dict of tables present in the archive with table name as key and
columns to import as values
schema_version: the current schema version, to compare against the dumped file
threads (int): the number of threads to use while decompressing, defaults to
db.DUMP_DEFAULT_THREAD_COUNT
"""
file_table_mapping = {
t.filename: t for t in tables_collection.tables
}

zstd_command = ['zstd', '--decompress', '--stdout', archive_path, f'-T{threads}']
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)
Expand All @@ -48,15 +52,14 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
current_app.logger.info('Schema version verified.')

else:
if file_name in tables:
current_app.logger.info('Importing data into %s table...', file_name)
if file_name in file_table_mapping:
current_app.logger.info('Importing data from %s...', file_name)
try:
table, fields = _escape_table_columns(file_name, tables[file_name])
query = SQL("COPY {table}({fields}) FROM STDIN").format(fields=fields, table=table)
cursor.copy_expert(query, tar.extractfile(member))
table = file_table_mapping[file_name]
table._import(cursor, tar.extractfile(member))
connection.commit()
except Exception:
current_app.logger.critical('Exception while importing table %s: ', file_name,
current_app.logger.critical('Exception while importing %s: ', file_name,
exc_info=True)
raise

Expand Down Expand Up @@ -149,7 +152,10 @@ def import_postgres_dump(private_dump_archive_path=None,
# if the private dump exists and has been imported, we need to
# ignore the sanitized user table in the public dump
# so remove it from tables_to_import
del tables_to_import['user']
tables_to_import.tables = [
t for t in tables_to_import.tables
if t.table_name != Identifier("user")
]

_import_dump(public_dump_archive_path, db.engine, tables_to_import, SCHEMA_VERSION_CORE, threads)
current_app.logger.info('Import of Public dump %s done!', public_dump_archive_path)
Expand Down
12 changes: 8 additions & 4 deletions listenbrainz/dumps/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
from listenbrainz.dumps import DUMP_DEFAULT_THREAD_COUNT
from listenbrainz.dumps.check import check_ftp_dump_ages
from listenbrainz.dumps.cleanup import _cleanup_dumps
from listenbrainz.dumps.exporter import dump_postgres_db, create_statistics_dump, dump_timescale_db, \
dump_feedback_for_spark
from listenbrainz.dumps.exporter import create_statistics_dump, dump_feedback_for_spark, dump_database
from listenbrainz.dumps.importer import import_postgres_dump
from listenbrainz.dumps.mapping import create_mapping_dump
from listenbrainz.listenstore import LISTEN_MINIMUM_DATE
Expand Down Expand Up @@ -157,14 +156,19 @@ def create_full(location: str, location_private: str, threads: int, dump_id: int
private_dump_path = os.path.join(location_private, dump_name)
create_path(private_dump_path)

locations = {
"public": dump_path,
"private": private_dump_path
}

expected_num_dumps = 0
expected_num_private_dumps = 0
if do_db_dump:
dump_postgres_db(dump_path, private_dump_path, end_time, threads)
dump_database("postgres", locations, end_time, threads)
expected_num_dumps += 1
expected_num_private_dumps += 1
if do_timescale_dump:
dump_timescale_db(dump_path, private_dump_path, end_time, threads)
dump_database("timescale", locations, end_time, threads)
expected_num_dumps += 1
expected_num_private_dumps += 1
if do_listen_dump:
Expand Down
183 changes: 61 additions & 122 deletions listenbrainz/dumps/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,140 +24,79 @@


import os
import shutil
import subprocess
import tarfile
import tempfile
from datetime import datetime

import sqlalchemy
from brainzutils import musicbrainz_db
from flask import current_app
from psycopg2.sql import SQL
from psycopg2.sql import Identifier, SQL

from listenbrainz import DUMP_LICENSE_FILE_PATH
from listenbrainz.db import timescale
from listenbrainz.dumps.tables import _escape_table_columns, PUBLIC_TABLES_MAPPING
from listenbrainz.dumps import DUMP_DEFAULT_THREAD_COUNT
from listenbrainz.dumps.exporter import zstd_dump
from listenbrainz.dumps.models import DumpTable, DumpFormat, DumpTablesCollection, DumpEngineName
from listenbrainz.utils import create_path


def _create_dump(location: str, lb_engine: sqlalchemy.engine.Engine,
mb_engine: sqlalchemy.engine.Engine, tables: dict,
dump_time: datetime):
""" Creates a dump of the provided tables at the location passed

Arguments:
location: the path where the dump should be created
db_engine: an sqlalchemy Engine instance for making a connection
tables: a dict containing the names of the tables to be dumped as keys and the columns
to be dumped as values
dump_time: the time at which the dump process was started

Returns:
the path to the archive file created
"""

archive_name = 'musicbrainz-canonical-dump-{time}'.format(
time=dump_time.strftime('%Y%m%d-%H%M%S')
MAPPING_TABLES = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh, very nice!

DumpTable(
table_name=Identifier("mapping", "canonical_musicbrainz_data"),
filename="canonical_musicbrainz_data.csv",
file_format=DumpFormat.csv,
columns=(
"id",
"artist_credit_id",
SQL("array_to_string(artist_mbids, ',') AS artist_mbids"),
"artist_credit_name",
"release_mbid",
"release_name",
"recording_mbid",
"recording_name",
"combined_lookup",
"score",
)
),
DumpTable(
table_name=Identifier("mapping", "canonical_recording_redirect"),
filename="canonical_recording_redirect.csv",
file_format=DumpFormat.csv,
columns=(
"recording_mbid",
"canonical_recording_mbid",
"canonical_release_mbid"
)
),
DumpTable(
table_name=Identifier("mapping", "canonical_release_redirect"),
filename="canonical_release_redirect.csv",
file_format=DumpFormat.csv,
columns=(
"release_mbid",
"canonical_release_mbid",
"release_group_mbid"
)
)
archive_path = os.path.join(location, '{archive_name}.tar.zst'.format(
archive_name=archive_name,
))

with open(archive_path, 'w') as archive:

zstd_command = ["zstd", "--compress", "-10"]
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)
]

with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:

temp_dir = tempfile.mkdtemp()

try:
timestamp_path = os.path.join(temp_dir, "TIMESTAMP")
with open(timestamp_path, "w") as f:
f.write(dump_time.isoformat(" "))
tar.add(timestamp_path,
arcname=os.path.join(archive_name, "TIMESTAMP"))
tar.add(DUMP_LICENSE_FILE_PATH,
arcname=os.path.join(archive_name, "COPYING"))
except Exception as e:
current_app.logger.error(
'Exception while adding dump metadata: %s', str(e), exc_info=True)
raise

archive_tables_dir = os.path.join(temp_dir, 'canonical')
create_path(archive_tables_dir)
def create_mapping_dump(location: str, dump_time: datetime, use_lb_conn: bool):
""" Create postgres database dump of the mapping supplemental tables. """
tables_collection = DumpTablesCollection(
engine_name=DumpEngineName.ts if use_lb_conn else DumpEngineName.mb,
tables=MAPPING_TABLES
)

for table in tables:
try:
engine_name = tables[table]['engine']
if engine_name == 'mb':
engine = mb_engine
elif engine_name == 'lb_if_set' and lb_engine:
engine = lb_engine
elif engine_name == 'lb_if_set':
engine = mb_engine
else:
raise ValueError(f'Unknown table engine name: {engine_name}')
with engine.connect() as connection:
with connection.begin() as transaction:
cursor = connection.connection.cursor()
copy_table(
cursor=cursor,
location=archive_tables_dir,
columns=tables[table]['columns'],
table_name=table,
)
transaction.rollback()
except Exception as e:
current_app.logger.error(
'Error while copying table %s: %s', table, str(e), exc_info=True)
raise
archive_name = "musicbrainz-canonical-dump-{time}".format(
time=dump_time.strftime("%Y%m%d-%H%M%S")
)

# Add the files to the archive in the order that they are defined in the dump definition.
for table, tabledata in tables.items():
filename = tabledata['filename']
tar.add(os.path.join(archive_tables_dir, table),
arcname=os.path.join(archive_name, 'canonical', filename))
metadata = {"TIMESTAMP": dump_time}
with zstd_dump(location, archive_name, metadata, DUMP_DEFAULT_THREAD_COUNT) as (zstd, tar, temp_dir, archive_path):
archive_tables_dir = os.path.join(temp_dir, "canonical")
create_path(archive_tables_dir)

shutil.rmtree(temp_dir)
tables_collection.dump_tables(archive_tables_dir)

zstd.stdin.close()
for table in tables_collection.tables:
tar.add(
os.path.join(archive_tables_dir, table.filename),
arcname=os.path.join(archive_name, "canonical", table.filename)
)

zstd.wait()
return archive_path


def create_mapping_dump(location: str, dump_time: datetime, use_lb_conn: bool):
""" Create postgres database dump of the mapping supplemental tables.
"""
if use_lb_conn:
lb_engine = timescale.engine
else:
lb_engine = None
musicbrainz_db.init_db_engine(current_app.config['MB_DATABASE_MAPPING_URI'])
return _create_dump(
location=location,
lb_engine=lb_engine,
mb_engine=musicbrainz_db.engine,
tables=PUBLIC_TABLES_MAPPING,
dump_time=dump_time
)


def copy_table(cursor, location, columns, table_name):
""" Copies a PostgreSQL table to a file

Arguments:
cursor: a psycopg cursor
location: the directory where the table should be copied
columns: a comma seperated string listing the columns of the table
that should be dumped
table_name: the name of the table to be copied
"""
table, fields = _escape_table_columns(table_name, columns)
with open(os.path.join(location, table_name), 'w') as f:
query = SQL("COPY (SELECT {fields} FROM {table}) TO STDOUT WITH CSV HEADER") \
.format(fields=fields, table=table)
cursor.copy_expert(query, f)
Loading
Loading