Skip to content

Commit 492af14

Browse files
authored
Merge pull request #3217 from metabrainz/source-profile
1. Upgrade spark cluster to 3.5.5, hadoop to 3.4.1, python to 3.13, other python dependencies 2. Add a cleanup script to remove old spark application from workers. 3. Remove the use of deprecated SQLContext. 4. Use read_files_from_HDFS where possible. 5. Fix artist map stats broken due to null country values. 6. Add a try/except around each request consumer job to avoid kombu client crashes. 7. Disable readSideCharPadding to avoid OOMs during artist country data import. Some of the changes for 1 and 2 are implemented in the recent commits of https://github.com/metabrainz/ansible-role-spark.
2 parents a222f52 + 53a3bb5 commit 492af14

27 files changed

+76
-183
lines changed

Dockerfile.spark

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ COPY docker/spark-cluster-config/test/core-site.xml $HADOOP_HOME/etc/hadoop/core
44
COPY docker/spark-cluster-config/test/hdfs-site.xml $HADOOP_HOME/etc/hadoop/hdfs-site.xml
55
COPY docker/spark-cluster-config/test/spark-env.sh $SPARK_HOME/conf/spark-env.sh
66

7-
RUN pip3 install pip==21.0.1
7+
RUN pip3 install pip==25.0.1 setuptools wheel
88

99
WORKDIR /rec
1010

docker/Dockerfile.spark.base

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
ARG PYTHON_BASE_IMAGE_VERSION=3.9-focal-20220315
1+
ARG PYTHON_BASE_IMAGE_VERSION=3.12-20241130
22
FROM metabrainz/python:$PYTHON_BASE_IMAGE_VERSION
33

44
ARG PYTHON_BASE_IMAGE_VERSION
@@ -26,9 +26,9 @@ RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSI
2626

2727
WORKDIR /usr/local
2828

29-
ENV JAVA_VERSION 11.0.21
29+
ENV JAVA_VERSION 11.0.26
3030
ENV JAVA_MAJOR_VERSION 11
31-
ENV JAVA_BUILD_VERSION 9
31+
ENV JAVA_BUILD_VERSION 4
3232
RUN wget https://github.com/adoptium/temurin${JAVA_MAJOR_VERSION}-binaries/releases/download/jdk-${JAVA_VERSION}%2B${JAVA_BUILD_VERSION}/OpenJDK${JAVA_MAJOR_VERSION}U-jdk_x64_linux_hotspot_${JAVA_VERSION}_${JAVA_BUILD_VERSION}.tar.gz \
3333
&& tar xzf OpenJDK${JAVA_MAJOR_VERSION}U-jdk_x64_linux_hotspot_${JAVA_VERSION}_${JAVA_BUILD_VERSION}.tar.gz \
3434
&& mv jdk-${JAVA_VERSION}+${JAVA_BUILD_VERSION} /usr/local/jdk \
@@ -38,7 +38,7 @@ ENV PATH $JAVA_HOME/bin:$PATH
3838

3939
COPY apache-download.sh /apache-download.sh
4040

41-
ENV HADOOP_VERSION 3.3.5
41+
ENV HADOOP_VERSION 3.4.1
4242
RUN /apache-download.sh hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz \
4343
&& tar xzf hadoop-${HADOOP_VERSION}.tar.gz \
4444
&& mv hadoop-${HADOOP_VERSION} /usr/local/hadoop \
@@ -48,7 +48,7 @@ ENV PATH $HADOOP_HOME/bin:$PATH
4848

4949
RUN mkdir /hdfs
5050

51-
ENV SPARK_VERSION 3.4.0
51+
ENV SPARK_VERSION 3.5.5
5252
RUN /apache-download.sh spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-without-hadoop.tgz \
5353
&& tar xzf spark-${SPARK_VERSION}-bin-without-hadoop.tgz \
5454
&& mv spark-${SPARK_VERSION}-bin-without-hadoop /usr/local/spark \
@@ -57,6 +57,6 @@ ENV SPARK_HOME /usr/local/spark
5757
ENV PATH $SPARK_HOME/bin:$PATH
5858
ENV PYTHONPATH $SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$SPARK_HOME/python:$PYTHONPATH
5959

60-
ENV POSTGRESQL_DRIVER_VERSION 42.7.0
60+
ENV POSTGRESQL_DRIVER_VERSION 42.7.5
6161
RUN wget -O postgresql-${POSTGRESQL_DRIVER_VERSION}.jar https://jdbc.postgresql.org/download/postgresql-${POSTGRESQL_DRIVER_VERSION}.jar \
6262
&& mv postgresql-${POSTGRESQL_DRIVER_VERSION}.jar ${SPARK_HOME}/jars

docker/start-spark-request-consumer.sh

+7-6
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ cd "$(dirname "${BASH_SOURCE[0]}")/../"
55

66
rm -rf pyspark_venv pyspark_venv.tar.gz listenbrainz_spark_request_consumer.zip models.zip
77

8-
python3 -m venv pyspark_venv
8+
python3.13 -m venv pyspark_venv
99
source pyspark_venv/bin/activate
10-
pip install -r requirements_spark.txt
11-
pip install venv-pack
10+
pip install --upgrade pip setuptools wheel venv-pack -r requirements_spark.txt
1211
venv-pack -o pyspark_venv.tar.gz
1312

14-
export PYSPARK_DRIVER_PYTHON=python
15-
export PYSPARK_PYTHON=./environment/bin/python
13+
VENV_PATH="$(realpath pyspark_venv)"
14+
export PYSPARK_DRIVER_PYTHON="${VENV_PATH}/bin/python3.13"
15+
export PYSPARK_PYTHON=./environment/bin/python3.13
1616

1717
GIT_COMMIT_SHA="$(git describe --tags --dirty --always)"
1818
echo "$GIT_COMMIT_SHA" > .git-version
@@ -25,8 +25,9 @@ source spark_config.sh
2525
--master spark://leader:7077 \
2626
--archives "pyspark_venv.tar.gz#environment" \
2727
--conf "spark.cores.max=$MAX_CORES" \
28+
--conf "spark.driver.maxResultSize=$DRIVER_MAX_RESULT_SIZE" \
2829
--executor-cores "$EXECUTOR_CORES" \
2930
--executor-memory "$EXECUTOR_MEMORY" \
3031
--driver-memory "$DRIVER_MEMORY" \
3132
--py-files listenbrainz_spark_request_consumer.zip,models.zip \
32-
spark_manage.py request_consumer
33+
spark_manage.py

listenbrainz/db/stats.py

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
from sentry_sdk import start_span
3232

3333
from data.model.common_stat import StatApi
34-
from data.model.user_artist_map import UserArtistMapRecord
3534
from listenbrainz.db import couchdb
3635
from listenbrainz.db.couchdb import try_insert_data
3736
from listenbrainz.db.user import get_users_by_id

listenbrainz_spark/__init__.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,20 @@
55
_formatter = logging.Formatter("%(asctime)s %(name)-20s %(levelname)-8s %(message)s")
66
_handler.setFormatter(_formatter)
77

8-
_logger = logging.getLogger("listenbrainz_spark")
8+
_logger = logging.getLogger(__name__)
99
_logger.setLevel(logging.INFO)
1010
_logger.addHandler(_handler)
1111

1212
import sentry_sdk
1313

1414
from py4j.protocol import Py4JJavaError
15-
from pyspark.sql import SparkSession, SQLContext
15+
from pyspark.sql import SparkSession
1616

1717
from listenbrainz_spark.exceptions import SparkSessionNotInitializedException
1818
from listenbrainz_spark import config
1919

2020
session = None
2121
context = None
22-
sql_context = None
2322

2423

2524
def init_spark_session(app_name):
@@ -30,15 +29,16 @@ def init_spark_session(app_name):
3029
"""
3130
if hasattr(config, "LOG_SENTRY"): # attempt to initialize sentry_sdk only if configuration available
3231
sentry_sdk.init(**config.LOG_SENTRY)
33-
global session, context, sql_context
32+
global session, context
3433
try:
34+
# readSideCharPadding enabled causes OOM when importing artist_country_code cache data
3535
session = SparkSession \
3636
.builder \
3737
.appName(app_name) \
38+
.config("spark.sql.readSideCharPadding", "false") \
3839
.getOrCreate()
3940
context = session.sparkContext
4041
context.setLogLevel("ERROR")
41-
sql_context = SQLContext(context)
4242
except Py4JJavaError as err:
4343
raise SparkSessionNotInitializedException(app_name, err.java_exception)
4444

@@ -51,7 +51,7 @@ def init_test_session(app_name):
5151
Set spark.driver.host to avoid tests from hanging (get_listens_from_dump hangs when taking union
5252
of full dump and incremental dump listens), see https://issues.apache.org/jira/browse/SPARK-16087
5353
"""
54-
global session, context, sql_context
54+
global session, context
5555
try:
5656
session = SparkSession \
5757
.builder \
@@ -69,6 +69,5 @@ def init_test_session(app_name):
6969
.getOrCreate()
7070
context = session.sparkContext
7171
context.setLogLevel("ERROR")
72-
sql_context = SQLContext(context)
7372
except Py4JJavaError as err:
7473
raise SparkSessionNotInitializedException(app_name, err.java_exception)

listenbrainz_spark/hdfs/utils.py

+2-58
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,12 @@
11
import logging
2-
import os
32
from pathlib import Path
43

5-
from hdfs.util import HdfsError
6-
74
from listenbrainz_spark import hdfs_connection
8-
from listenbrainz_spark.exceptions import (HDFSDirectoryNotDeletedException,
9-
PathNotFoundException)
5+
from listenbrainz_spark.exceptions import HDFSDirectoryNotDeletedException
106

117
logger = logging.getLogger(__name__)
128

139

14-
# A typical listen is of the form:
15-
# {
16-
# "artist_mbids": [],
17-
# "artist_name": "Cake",
18-
# "listened_at": "2005-02-28T20:39:08Z",
19-
# "recording_msid": "c559b2f8-41ff-4b55-ab3c-0b57d9b85d11",
20-
# "recording_mbid": "1750f8ca-410e-4bdc-bf90-b0146cb5ee35",
21-
# "release_mbid": "",
22-
# "release_name": null,
23-
# "tags": [],
24-
# "track_name": "Tougher Than It Is"
25-
# "user_id": 5,
26-
# }
27-
# All the keys in the dict are column/field names in a Spark dataframe.
28-
29-
3010
def create_dir(path):
3111
""" Creates a directory in HDFS.
3212
Args:
@@ -57,25 +37,7 @@ def path_exists(path):
5737
path (string): Path to check status for.
5838
Note: Caller is responsible for initializing HDFS connection.
5939
"""
60-
path_found = hdfs_connection.client.status(path, strict=False)
61-
if path_found:
62-
return True
63-
return False
64-
65-
66-
def hdfs_walk(path, depth=0):
67-
""" Depth-first walk of HDFS filesystem.
68-
Args:
69-
path (str): Path to start DFS.
70-
depth (int): Maximum depth to explore files/folders. 0 for no limit.
71-
Returns:
72-
walk: a generator yeilding tuples (path, dirs, files).
73-
"""
74-
try:
75-
walk = hdfs_connection.client.walk(hdfs_path=path, depth=depth)
76-
return walk
77-
except HdfsError as err:
78-
raise PathNotFoundException(str(err), path)
40+
return hdfs_connection.client.status(path, strict=False)
7941

8042

8143
def upload_to_HDFS(hdfs_path, local_path):
@@ -96,24 +58,6 @@ def rename(hdfs_src_path: str, hdfs_dst_path: str):
9658
hdfs_connection.client.rename(hdfs_src_path, hdfs_dst_path)
9759

9860

99-
def copy(hdfs_src_path: str, hdfs_dst_path: str, overwrite: bool = False):
100-
""" Copy a file or folder in HDFS
101-
Args:
102-
hdfs_src_path – Source path.
103-
hdfs_dst_path – Destination path. If the path already exists and is a directory, the source will be copied into it.
104-
overwrite - Wether to overwrite the path if it already exists.
105-
"""
106-
walk = hdfs_walk(hdfs_src_path)
107-
108-
for (root, dirs, files) in walk:
109-
for _file in files:
110-
src_file_path = os.path.join(root, _file)
111-
dst_file_path = os.path.join(hdfs_dst_path, os.path.relpath(src_file_path, hdfs_src_path))
112-
with hdfs_connection.client.read(src_file_path) as reader:
113-
with hdfs_connection.client.write(dst_file_path, overwrite=overwrite) as writer:
114-
writer.write(reader.read())
115-
116-
11761
def move(hdfs_src_path: str, hdfs_dest_path: str):
11862
""" Move a file or folder in HDFS """
11963
# Delete existing destination directory if any

listenbrainz_spark/listens/cache.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
from typing import Optional
33

4-
from pandas import DataFrame
4+
from pyspark.sql import DataFrame
55

66
from listenbrainz_spark.listens.metadata import get_listens_metadata
77
from listenbrainz_spark.utils import read_files_from_HDFS

listenbrainz_spark/listens/compact.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def write_partitioned_listens(table):
3636
new_base_listens_location = os.path.join(new_location, "base")
3737

3838
listenbrainz_spark \
39-
.sql_context \
39+
.session \
4040
.sql(query) \
4141
.write \
4242
.partitionBy("year", "month") \
@@ -48,7 +48,7 @@ def write_partitioned_listens(table):
4848
from parquet.`{new_base_listens_location}`
4949
"""
5050
result = listenbrainz_spark \
51-
.sql_context \
51+
.session \
5252
.sql(query) \
5353
.collect()[0]
5454

listenbrainz_spark/listens/data.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def get_base_listens_df(location, start: datetime, end: datetime):
112112
, artist_credit_mbids
113113
from parquet.`{location}`
114114
""") + where_clause
115-
return listenbrainz_spark.sql_context.sql(query)
115+
return listenbrainz_spark.session.sql(query)
116116

117117

118118
def get_latest_listen_ts() -> datetime:

listenbrainz_spark/listens/dump.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ def process_incremental_listens_dump(temp_path):
236236
GROUP BY user_id
237237
"""
238238
listenbrainz_spark \
239-
.sql_context \
239+
.session \
240240
.sql(query) \
241241
.repartition(1) \
242242
.write \
@@ -251,7 +251,7 @@ def process_incremental_listens_dump(temp_path):
251251
from parquet.`{inc_listens_location}`
252252
"""
253253
result = listenbrainz_spark \
254-
.sql_context \
254+
.session \
255255
.sql(query) \
256256
.collect()[0]
257257
update_listens_metadata(location, result.max_listened_at, result.max_created)

listenbrainz_spark/mlhd/download.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def post_process_mlhd_plus():
4343
"""
4444
for chunk in MLHD_PLUS_CHUNKS:
4545
listenbrainz_spark\
46-
.sql_context\
46+
.session\
4747
.read\
4848
.format("parquet")\
4949
.option("pathGlobFilter", f"{chunk}*.parquet")\

listenbrainz_spark/postgres/utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
def load_from_db(url, user, password, query):
99
return listenbrainz_spark\
10-
.sql_context\
10+
.session\
1111
.read\
1212
.format("jdbc")\
1313
.option("url", url)\

listenbrainz_spark/recommendations/recording/create_dataframes.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def save_playcounts_df(listens_df, recordings_df, users_df, metadata, save_path)
169169
.agg(func.count('recording_id').alias('playcount'))
170170
playcounts_df.createOrReplaceTempView("playcounts")
171171

172-
transformed_listencounts = listenbrainz_spark.sql_context.sql(f"""
172+
transformed_listencounts = listenbrainz_spark.session.sql(f"""
173173
SELECT spark_user_id
174174
, recording_id
175175
, playcount

listenbrainz_spark/recommendations/recording/recommend.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def get_most_recent_model_meta():
4040
model_id (str): Model identification string.
4141
"""
4242
utils.read_files_from_HDFS(path.RECOMMENDATION_RECORDING_MODEL_METADATA).createOrReplaceTempView("model_metadata")
43-
meta = listenbrainz_spark.sql_context.sql("""
43+
meta = listenbrainz_spark.session.sql("""
4444
SELECT model_id, model_html_file
4545
FROM model_metadata
4646
ORDER BY model_created DESC

listenbrainz_spark/request_consumer/request_consumer.py

+9-15
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,15 @@ def push_to_result_queue(self, messages):
102102
logger.info("No messages calculated")
103103

104104
def callback(self, message: Message):
105-
request = json.loads(message.body)
106-
logger.info('Received a request!')
107-
messages = self.get_result(request)
108-
if messages:
109-
self.push_to_result_queue(messages)
110-
logger.info('Request done!')
105+
try:
106+
request = json.loads(message.body)
107+
logger.info('Received a request!')
108+
messages = self.get_result(request)
109+
if messages:
110+
self.push_to_result_queue(messages)
111+
logger.info('Request done!')
112+
except Exception as e:
113+
logger.error("Error while processing request: %s", str(e), exc_info=True)
111114

112115
def get_consumers(self, _, channel):
113116
return [
@@ -135,12 +138,3 @@ def start(self, app_name):
135138
except Exception as e:
136139
logger.critical("Error in spark-request-consumer: %s", str(e), exc_info=True)
137140
time.sleep(2)
138-
139-
140-
def main(app_name):
141-
rc = RequestConsumer()
142-
rc.start(app_name)
143-
144-
145-
if __name__ == '__main__':
146-
main('spark-writer')

listenbrainz_spark/similarity/artist.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
from more_itertools import chunked
44

5-
import listenbrainz_spark
6-
from listenbrainz_spark import config
75
from listenbrainz_spark.path import RECORDING_LENGTH_DATAFRAME, ARTIST_CREDIT_MBID_DATAFRAME
86
from listenbrainz_spark.stats import run_query
97
from listenbrainz_spark.listens.data import get_listens_from_dump
8+
from listenbrainz_spark.utils import read_files_from_HDFS
109

1110
RECORDINGS_PER_MESSAGE = 10000
1211
# the duration value in seconds to use for track whose duration data in not available in MB
@@ -128,10 +127,10 @@ def main(days, session, contribution, threshold, limit, skip, is_production_data
128127

129128
get_listens_from_dump(from_date, to_date).createOrReplaceTempView(table)
130129

131-
metadata_df = listenbrainz_spark.sql_context.read.parquet(config.HDFS_CLUSTER_URI + RECORDING_LENGTH_DATAFRAME)
130+
metadata_df = read_files_from_HDFS(RECORDING_LENGTH_DATAFRAME)
132131
metadata_df.createOrReplaceTempView(metadata_table)
133132

134-
artist_credit_df = listenbrainz_spark.sql_context.read.parquet(config.HDFS_CLUSTER_URI + ARTIST_CREDIT_MBID_DATAFRAME)
133+
artist_credit_df = read_files_from_HDFS(ARTIST_CREDIT_MBID_DATAFRAME)
135134
artist_credit_df.createOrReplaceTempView(artist_credit_table)
136135

137136
skip_threshold = -skip

0 commit comments

Comments
 (0)