Skip to content

Commit a222f52

Browse files
committedMar 7, 2025
Migrate from xz to zstd for dumps compression
Tested on jsonl statistics dumps which are 43G uncompressed, the results are as follows: | | xz | zstd (-10) | |-------------|--------|------------| | Compression | 8.3x | 6.7x | | Time Taken | 87m 3s | 33m 12s | xz compresses a little better but zstd is much better, and I feel the overall tradeoffs are in favour of zstd.
1 parent 88137c6 commit a222f52

File tree

9 files changed

+91
-87
lines changed

9 files changed

+91
-87
lines changed
 

‎admin/create-dumps.sh

+5-5
Original file line numberDiff line numberDiff line change
@@ -236,22 +236,22 @@ touch "$FTP_CURRENT_DUMP_DIR/.rsync-filter"
236236

237237
add_rsync_include_rule \
238238
"$FTP_CURRENT_DUMP_DIR" \
239-
"listenbrainz-public-dump-$DUMP_TIMESTAMP.tar.xz"
239+
"listenbrainz-public-dump-$DUMP_TIMESTAMP.tar.zst"
240240
add_rsync_include_rule \
241241
"$FTP_CURRENT_DUMP_DIR" \
242-
"listenbrainz-public-timescale-dump-$DUMP_TIMESTAMP.tar.xz"
242+
"listenbrainz-public-timescale-dump-$DUMP_TIMESTAMP.tar.zst"
243243
add_rsync_include_rule \
244244
"$FTP_CURRENT_DUMP_DIR" \
245-
"listenbrainz-listens-dump-$DUMP_ID-$DUMP_TIMESTAMP-$DUMP_TYPE.tar.xz"
245+
"listenbrainz-listens-dump-$DUMP_ID-$DUMP_TIMESTAMP-$DUMP_TYPE.tar.zst"
246246
add_rsync_include_rule \
247247
"$FTP_CURRENT_DUMP_DIR" \
248248
"listenbrainz-spark-dump-$DUMP_ID-$DUMP_TIMESTAMP-$DUMP_TYPE.tar"
249249
add_rsync_include_rule \
250250
"$FTP_CURRENT_DUMP_DIR" \
251-
"listenbrainz-feedback-dump-$DUMP_TIMESTAMP.tar.xz"
251+
"listenbrainz-feedback-dump-$DUMP_TIMESTAMP.tar.zst"
252252
add_rsync_include_rule \
253253
"$FTP_CURRENT_DUMP_DIR" \
254-
"listenbrainz-statistics-dump-$DUMP_TIMESTAMP.tar.xz"
254+
"listenbrainz-statistics-dump-$DUMP_TIMESTAMP.tar.zst"
255255
add_rsync_include_rule \
256256
"$FTP_CURRENT_DUMP_DIR" \
257257
"musicbrainz-canonical-dump-$DUMP_TIMESTAMP.tar.zst"

‎docs/users/listenbrainz-dumps.rst

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,28 @@ File Descriptions
2222

2323
A ListenBrainz data dump consists of three archives:
2424

25-
#. ``listenbrainz-public-dump.tar.xz``
25+
#. ``listenbrainz-public-dump.tar.zst``
2626

27-
#. ``listenbrainz-listens-dump.tar.xz``
27+
#. ``listenbrainz-listens-dump.tar.zst``
2828

29-
#. ``listenbrainz-listens-dump-spark.tar.xz``
29+
#. ``listenbrainz-listens-dump-spark.tar.zst``
3030

3131

32-
listenbrainz-public-dump.tar.xz
32+
listenbrainz-public-dump.tar.zst
3333
-------------------------------
3434

3535
This file contains information about ListenBrainz users and statistics derived
3636
from listens submitted to ListenBrainz calculated from users, artists, recordings etc.
3737

3838

39-
listenbrainz-listens-dump.tar.xz
39+
listenbrainz-listens-dump.tar.zst
4040
--------------------------------
4141

4242
This is the core ListenBrainz data dump. This file contains all the listens
4343
submitted to ListenBrainz by its users.
4444

4545

46-
listenbrainz-listens-dump-spark.tar.xz
46+
listenbrainz-listens-dump-spark.tar.zst
4747
--------------------------------------
4848

4949
This is also a dump of the core ListenBrainz listen data. These dumps are

‎listenbrainz/db/dump.py

+11-14
Original file line numberDiff line numberDiff line change
@@ -393,16 +393,13 @@ def _create_dump(location: str, db_engine: Optional[sqlalchemy.engine.Engine], d
393393
dump_type=dump_type,
394394
time=dump_time.strftime('%Y%m%d-%H%M%S')
395395
)
396-
archive_path = os.path.join(location, '{archive_name}.tar.xz'.format(
397-
archive_name=archive_name,
398-
))
396+
archive_path = os.path.join(location, f'{archive_name}.tar.zst')
399397

400398
with open(archive_path, 'w') as archive:
399+
zstd_command = ['zstd', '--compress', f'-T{threads}', '-10']
400+
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)
401401

402-
xz_command = ['xz', '--compress', '-T{threads}'.format(threads=threads)]
403-
xz = subprocess.Popen(xz_command, stdin=subprocess.PIPE, stdout=archive)
404-
405-
with tarfile.open(fileobj=xz.stdin, mode='w|') as tar:
402+
with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:
406403

407404
temp_dir = tempfile.mkdtemp()
408405

@@ -462,9 +459,9 @@ def _create_dump(location: str, db_engine: Optional[sqlalchemy.engine.Engine], d
462459

463460
shutil.rmtree(temp_dir)
464461

465-
xz.stdin.close()
462+
zstd.stdin.close()
466463

467-
xz.wait()
464+
zstd.wait()
468465
return archive_path
469466

470467

@@ -831,7 +828,7 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
831828
""" Import dump present in passed archive path into postgres db.
832829
833830
Arguments:
834-
archive_path: path to the .tar.xz archive to be imported
831+
archive_path: path to the .tar.zst archive to be imported
835832
db_engine: an sqlalchemy Engine instance for making a connection
836833
tables: dict of tables present in the archive with table name as key and
837834
columns to import as values
@@ -840,13 +837,13 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
840837
db.DUMP_DEFAULT_THREAD_COUNT
841838
"""
842839

843-
xz_command = ['xz', '--decompress', '--stdout', archive_path, '-T{threads}'.format(threads=threads)]
844-
xz = subprocess.Popen(xz_command, stdout=subprocess.PIPE)
840+
zstd_command = ['zstd', '--decompress', '--stdout', archive_path, f'-T{threads}']
841+
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)
845842

846843
connection = db_engine.raw_connection()
847844
try:
848845
cursor = connection.cursor()
849-
with tarfile.open(fileobj=xz.stdout, mode='r|') as tar:
846+
with tarfile.open(fileobj=zstd.stdout, mode='r|') as tar:
850847
for member in tar:
851848
file_name = member.name.split('/')[-1]
852849

@@ -875,7 +872,7 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
875872
current_app.logger.info('Imported table %s', file_name)
876873
finally:
877874
connection.close()
878-
xz.stdout.close()
875+
zstd.stdout.close()
879876

880877

881878
def _update_sequence(db_engine: sqlalchemy.engine.Engine, seq_name, table_name):

‎listenbrainz/db/tests/test_dump.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ def test_create_stats_dump(self):
8282

8383
found = set()
8484
found_stats = None
85-
xz_command = ['xz', '--decompress', '--stdout', dump_location, '-T4']
86-
xz = subprocess.Popen(xz_command, stdout=subprocess.PIPE)
87-
with tarfile.open(fileobj=xz.stdout, mode='r|') as tar:
85+
zstd_command = ['zstd', '--decompress', '--stdout', dump_location, '-T4']
86+
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)
87+
with tarfile.open(fileobj=zstd.stdout, mode='r|') as tar:
8888
for member in tar:
8989
file_name = member.name.split('/')[-1]
9090
if file_name.endswith(".jsonl"):

‎listenbrainz/db/tests/test_dump_manager.py

+14-11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import os
2323
import shutil
24+
import subprocess
2425
import tarfile
2526
import tempfile
2627
import time
@@ -148,13 +149,13 @@ def test_create_full_db(self):
148149
# dumps should contain the 7 archives
149150
archive_count = 0
150151
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
151-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
152+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
152153
archive_count += 1
153154
self.assertEqual(archive_count, 5)
154155

155156
private_archive_count = 0
156157
for file_name in os.listdir(os.path.join(self.tempdir_private, dump_name)):
157-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
158+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
158159
private_archive_count += 1
159160
self.assertEqual(private_archive_count, 2)
160161

@@ -197,13 +198,13 @@ def test_create_full_dump_with_id(self):
197198
# dumps should contain the 7 archives
198199
archive_count = 0
199200
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
200-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
201+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
201202
archive_count += 1
202203
self.assertEqual(archive_count, 5)
203204

204205
private_archive_count = 0
205206
for file_name in os.listdir(os.path.join(self.tempdir_private, dump_name)):
206-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
207+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
207208
private_archive_count += 1
208209
self.assertEqual(private_archive_count, 2)
209210

@@ -276,7 +277,7 @@ def test_create_incremental(self):
276277
# make sure that the dump contains a full listens and spark dump
277278
archive_count = 0
278279
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
279-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
280+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
280281
archive_count += 1
281282
self.assertEqual(archive_count, 2)
282283

@@ -315,14 +316,16 @@ def test_create_full_when_incremental_exists(self):
315316
# make sure that the dump contains a full listens and spark dump
316317
archive_count = 0
317318
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
318-
if file_name.endswith(".tar.xz") or file_name.endswith(".tar"):
319+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
319320
archive_count += 1
320321
self.assertEqual(archive_count, 2)
321322

322-
dump_file_name = dump_name.replace("dump", "listens-dump") + ".tar.xz"
323+
dump_file_name = dump_name.replace("dump", "listens-dump") + ".tar.zst"
323324
listens_dump_file = os.path.join(self.tempdir, dump_name, dump_file_name)
324-
with tarfile.open(listens_dump_file, "r:xz") as f:
325-
for member in f.getmembers():
325+
zstd_command = ["zstd", "--decompress", "--stdout", listens_dump_file, "-T4"]
326+
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)
327+
with tarfile.open(fileobj=zstd.stdout, mode="r|") as f:
328+
for member in f:
326329
if member.name.endswith(".listens"):
327330
lines = f.extractfile(member).readlines()
328331
# five listens were dumped as expected as only five listens were created until the
@@ -353,7 +356,7 @@ def test_create_incremental_dump_with_id(self):
353356
# dump should contain the listen and spark archive
354357
archive_count = 0
355358
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
356-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
359+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
357360
archive_count += 1
358361
self.assertEqual(archive_count, 2)
359362

@@ -419,6 +422,6 @@ def test_create_feedback(self):
419422
# make sure that the dump contains a feedback dump
420423
archive_count = 0
421424
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
422-
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
425+
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
423426
archive_count += 1
424427
self.assertEqual(archive_count, 1)

‎listenbrainz/listenstore/dump_listenstore.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def write_listens(self, temp_dir, tar_file, archive_name,
229229

230230
def dump_listens(self, location, dump_id, start_time, end_time, dump_type,
231231
threads=DUMP_DEFAULT_THREAD_COUNT):
232-
""" Dumps all listens in the ListenStore into a .tar.xz archive.
232+
""" Dumps all listens in the ListenStore into a .tar.zst archive.
233233
234234
Files are created with UUIDs as names. Each file can contain listens for a number of users.
235235
An index.json file is used to save which file contains the listens of which users.
@@ -254,12 +254,12 @@ def dump_listens(self, location, dump_id, start_time, end_time, dump_type,
254254
archive_name = '{}-full'.format(archive_name)
255255
else:
256256
archive_name = '{}-incremental'.format(archive_name)
257-
archive_path = os.path.join(location, f'{archive_name}.tar.xz')
257+
archive_path = os.path.join(location, f'{archive_name}.tar.zst')
258258
with open(archive_path, 'w') as archive:
259-
xz_command = ['xz', '--compress', f'-T{threads}']
260-
xz = subprocess.Popen(xz_command, stdin=subprocess.PIPE, stdout=archive)
259+
zstd_command = ['zstd', '--compress', f'-T{threads}', '-10']
260+
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)
261261

262-
with tarfile.open(fileobj=xz.stdin, mode='w|') as tar:
262+
with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:
263263
temp_dir = os.path.join(self.dump_temp_dir_root, str(uuid.uuid4()))
264264
create_path(temp_dir)
265265
self.write_dump_metadata(
@@ -276,9 +276,9 @@ def dump_listens(self, location, dump_id, start_time, end_time, dump_type,
276276
# remove the temporary directory
277277
shutil.rmtree(temp_dir)
278278

279-
xz.stdin.close()
279+
zstd.stdin.close()
280280

281-
xz.wait()
281+
zstd.wait()
282282
self.log.info('ListenBrainz listen dump done!')
283283
self.log.info('Dump present at %s!', archive_path)
284284
return archive_path

‎listenbrainz/listenstore/tests/test_dumplistenstore.py

+36-31
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import os
22
import shutil
3+
import subprocess
34
import tarfile
45
import tempfile
56
from datetime import datetime, timezone, timedelta
7+
from tempfile import TemporaryDirectory
68

79
from psycopg2.extras import execute_values
810

@@ -196,7 +198,7 @@ def test_dump_and_import_listens_escaped(self):
196198

197199
# test test_import_dump_many_users is gone -- why are we testing user dump/restore here??
198200

199-
def create_test_dump(self, archive_name, archive_path, schema_version=None):
201+
def create_test_dump(self, temp_dir, archive_name, archive_path, schema_version=None):
200202
""" Creates a test dump to test the import listens functionality.
201203
Args:
202204
archive_name (str): the name of the archive
@@ -206,40 +208,43 @@ def create_test_dump(self, archive_name, archive_path, schema_version=None):
206208
Returns:
207209
the full path to the archive created
208210
"""
209-
210-
temp_dir = tempfile.mkdtemp()
211-
with tarfile.open(archive_path, mode='w|xz') as tar:
212-
schema_version_path = os.path.join(temp_dir, 'SCHEMA_SEQUENCE')
213-
with open(schema_version_path, 'w') as f:
214-
f.write(str(schema_version or ' '))
215-
tar.add(schema_version_path,
216-
arcname=os.path.join(archive_name, 'SCHEMA_SEQUENCE'))
217-
211+
with open(archive_path, 'w') as archive:
212+
zstd_command = ['zstd', '--compress', '-T4']
213+
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)
214+
with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:
215+
schema_version_path = os.path.join(temp_dir, 'SCHEMA_SEQUENCE')
216+
with open(schema_version_path, 'w') as f:
217+
f.write(str(schema_version or ' '))
218+
tar.add(schema_version_path,
219+
arcname=os.path.join(archive_name, 'SCHEMA_SEQUENCE'))
220+
zstd.stdin.close()
221+
zstd.wait()
218222
return archive_path
219223

220224
def test_schema_mismatch_exception_for_dump_incorrect_schema(self):
221225
""" Tests that SchemaMismatchException is raised when the schema of the dump is old """
222-
223-
# create a temp archive with incorrect SCHEMA_VERSION_CORE
224-
temp_dir = tempfile.mkdtemp()
225-
archive_name = 'temp_dump'
226-
archive_path = os.path.join(temp_dir, archive_name + '.tar.xz')
227-
archive_path = self.create_test_dump(
228-
archive_name=archive_name,
229-
archive_path=archive_path,
230-
schema_version=LISTENS_DUMP_SCHEMA_VERSION - 1
231-
)
232-
with self.assertRaises(SchemaMismatchException):
233-
self.ls.import_listens_dump(archive_path)
226+
with TemporaryDirectory() as temp_dir:
227+
# create a temp archive with incorrect SCHEMA_VERSION_CORE
228+
archive_name = 'temp_dump'
229+
archive_path = os.path.join(temp_dir, archive_name + '.tar.zst')
230+
archive_path = self.create_test_dump(
231+
temp_dir=temp_dir,
232+
archive_name=archive_name,
233+
archive_path=archive_path,
234+
schema_version=LISTENS_DUMP_SCHEMA_VERSION - 1
235+
)
236+
with self.assertRaises(SchemaMismatchException):
237+
self.ls.import_listens_dump(archive_path)
234238

235239
def test_schema_mismatch_exception_for_dump_no_schema(self):
236240
""" Tests that SchemaMismatchException is raised when there is no schema version in the archive """
237-
238-
temp_dir = tempfile.mkdtemp()
239-
archive_name = 'temp_dump'
240-
archive_path = os.path.join(temp_dir, archive_name + '.tar.xz')
241-
242-
archive_path = self.create_test_dump(archive_name=archive_name, archive_path=archive_path)
243-
244-
with self.assertRaises(SchemaMismatchException):
245-
self.ls.import_listens_dump(archive_path)
241+
with TemporaryDirectory() as temp_dir:
242+
archive_name = 'temp_dump'
243+
archive_path = os.path.join(temp_dir, archive_name + '.tar.zst')
244+
archive_path = self.create_test_dump(
245+
temp_dir=temp_dir,
246+
archive_name=archive_name,
247+
archive_path=archive_path
248+
)
249+
with self.assertRaises(SchemaMismatchException):
250+
self.ls.import_listens_dump(archive_path)

0 commit comments

Comments
 (0)