10
10
from listenbrainz_spark import hdfs_connection
11
11
from listenbrainz_spark .config import HDFS_CLUSTER_URI
12
12
from listenbrainz_spark .path import INCREMENTAL_DUMPS_SAVE_PATH
13
- from listenbrainz_spark .schema import BOOKKEEPING_SCHEMA
13
+ from listenbrainz_spark .schema import BOOKKEEPING_SCHEMA , INCREMENTAL_BOOKKEEPING_SCHEMA
14
14
from listenbrainz_spark .stats import run_query
15
15
from listenbrainz_spark .stats .incremental .message_creator import MessageCreator
16
16
from listenbrainz_spark .stats .incremental .query_provider import QueryProvider
@@ -50,6 +50,7 @@ def __init__(self, provider: QueryProvider, message_creator: MessageCreator):
50
50
self .message_creator = message_creator
51
51
self ._cache_tables = []
52
52
self ._only_inc = None
53
+ self .incremental_table = None
53
54
54
55
@property
55
56
def only_inc (self ):
@@ -68,7 +69,7 @@ def _setup_cache_tables(self):
68
69
69
70
def partial_aggregate_usable (self ) -> bool :
70
71
""" Checks whether a partial aggregate exists and is fresh to generate the required stats. """
71
- metadata_path = self .provider .get_bookkeeping_path ()
72
+ metadata_path = f" { self .provider .get_bookkeeping_path ()} /full"
72
73
existing_aggregate_path = self .provider .get_existing_aggregate_path ()
73
74
74
75
try :
@@ -95,7 +96,7 @@ def create_partial_aggregate(self) -> DataFrame:
95
96
Returns:
96
97
DataFrame: The generated partial aggregate DataFrame.
97
98
"""
98
- metadata_path = self .provider .get_bookkeeping_path ()
99
+ metadata_path = f" { self .provider .get_bookkeeping_path ()} /full"
99
100
existing_aggregate_path = self .provider .get_existing_aggregate_path ()
100
101
101
102
table = f"{ self .provider .get_table_prefix ()} _full_listens"
@@ -129,12 +130,35 @@ def create_incremental_aggregate(self) -> DataFrame:
129
130
Returns:
130
131
DataFrame: The generated incremental aggregate DataFrame.
131
132
"""
132
- table = f"{ self .provider .get_table_prefix ()} _incremental_listens"
133
+ self . incremental_table = f"{ self .provider .get_table_prefix ()} _incremental_listens"
133
134
read_files_from_HDFS (INCREMENTAL_DUMPS_SAVE_PATH ) \
134
- .createOrReplaceTempView (table )
135
- inc_query = self .provider .get_aggregate_query (table , self ._cache_tables )
135
+ .createOrReplaceTempView (self . incremental_table )
136
+ inc_query = self .provider .get_aggregate_query (self . incremental_table , self ._cache_tables )
136
137
return run_query (inc_query )
137
138
139
+ def bookkeep_incremental_aggregate (self ):
140
+ metadata_path = f"{ self .provider .get_bookkeeping_path ()} /incremental"
141
+ query = f"SELECT max(created) AS latest_created_at FROM { self .incremental_table } "
142
+ latest_created_at = run_query (query ).collect ()[0 ]["latest_created_at" ]
143
+ metadata_df = listenbrainz_spark .session .createDataFrame (
144
+ [(latest_created_at , datetime .now ())],
145
+ schema = INCREMENTAL_BOOKKEEPING_SCHEMA
146
+ )
147
+ metadata_df .write .mode ("overwrite" ).json (metadata_path )
148
+
149
+ def get_incremental_dumps_existing_created (self ):
150
+ metadata_path = f"{ self .provider .get_bookkeeping_path ()} /incremental"
151
+ try :
152
+ metadata = listenbrainz_spark \
153
+ .session \
154
+ .read \
155
+ .schema (INCREMENTAL_BOOKKEEPING_SCHEMA ) \
156
+ .json (f"{ HDFS_CLUSTER_URI } { metadata_path } " ) \
157
+ .collect ()[0 ]
158
+ return metadata ["created" ]
159
+ except AnalysisException :
160
+ return None
161
+
138
162
def generate_stats (self ) -> DataFrame :
139
163
self ._setup_cache_tables ()
140
164
prefix = self .provider .get_table_prefix ()
@@ -155,14 +179,30 @@ def generate_stats(self) -> DataFrame:
155
179
inc_df .createOrReplaceTempView (inc_table )
156
180
157
181
if self ._only_inc :
158
- filter_query = self .provider .get_filter_aggregate_query (partial_table , inc_table )
159
- filtered_aggregate_df = run_query (filter_query )
160
- filtered_table = f"{ prefix } _filtered_aggregate"
161
- filtered_aggregate_df .createOrReplaceTempView (filtered_table )
182
+ existing_created = self .get_incremental_dumps_existing_created ()
183
+
184
+ filter_existing_query = self .provider .get_filter_aggregate_query (
185
+ partial_table ,
186
+ self .incremental_table ,
187
+ existing_created
188
+ )
189
+ filtered_existing_aggregate_df = run_query (filter_existing_query )
190
+ filtered_existing_table = f"{ prefix } _filtered_existing_aggregate"
191
+ filtered_existing_aggregate_df .createOrReplaceTempView (filtered_existing_table )
192
+
193
+ filter_incremental_query = self .provider .get_filter_aggregate_query (
194
+ inc_table ,
195
+ self .incremental_table ,
196
+ existing_created
197
+ )
198
+ filtered_incremental_aggregate_df = run_query (filter_incremental_query )
199
+ filtered_incremental_table = f"{ prefix } _filtered_incremental_aggregate"
200
+ filtered_incremental_aggregate_df .createOrReplaceTempView (filtered_incremental_table )
162
201
else :
163
- filtered_table = partial_table
202
+ filtered_existing_table = partial_table
203
+ filtered_incremental_table = inc_table
164
204
165
- final_query = self .provider .get_combine_aggregates_query (filtered_table , inc_table )
205
+ final_query = self .provider .get_combine_aggregates_query (filtered_existing_table , filtered_incremental_table )
166
206
final_df = run_query (final_query )
167
207
else :
168
208
final_df = partial_df
@@ -183,3 +223,4 @@ def run(self) -> Iterator[Dict]:
183
223
yield message
184
224
if not self .only_inc :
185
225
yield self .message_creator .create_end_message ()
226
+ self .bookkeep_incremental_aggregate ()
0 commit comments