Skip to content

Commit 494ec5e

Browse files
authored
Fix get_intermediate_stats_df (#3190)
When loading intermediate stats dataframe if start and end is not provided, it reads the file as is without excluding year and month column used for partitioning. This causes an error when trying to combining it with the incremental listens dataframe which doesn't have the two fields. Hence, fix the loading.
1 parent b38c6c7 commit 494ec5e

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

listenbrainz_spark/utils/__init__.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,15 @@ def get_listens_from_dump(start: datetime, end: datetime, include_incremental=Tr
177177

178178
def get_intermediate_stats_df(start: datetime, end: datetime):
179179
if start is None and end is None:
180-
return read_files_from_HDFS(LISTENBRAINZ_INTERMEDIATE_STATS_DIRECTORY)
181-
182-
filters = []
183-
184-
current = start
185-
step = relativedelta(months=1)
186-
while current <= end:
187-
filters.append(f"(year = {current.year} AND month = {current.month})")
188-
current += step
189-
combined_filter = "(\n " + "\n OR ".join(filters) + "\n )"
180+
where_clause = ""
181+
else:
182+
filters = []
183+
current = start
184+
step = relativedelta(months=1)
185+
while current <= end:
186+
filters.append(f"(year = {current.year} AND month = {current.month})")
187+
current += step
188+
where_clause = "where (\n " + "\n OR ".join(filters) + "\n )"
190189

191190
query = dedent(f"""\
192191
select listened_at
@@ -201,7 +200,7 @@ def get_intermediate_stats_df(start: datetime, end: datetime):
201200
, recording_mbid
202201
, artist_credit_mbids
203202
from parquet.`{LISTENBRAINZ_INTERMEDIATE_STATS_DIRECTORY}`
204-
where """) + combined_filter
203+
""") + where_clause
205204
return listenbrainz_spark.sql_context.sql(query)
206205

207206

0 commit comments

Comments
 (0)