@@ -301,22 +301,25 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
301
301
.option ("includeHeaders" , "true" )
302
302
.option ("failOnDataLoss" , "false" )
303
303
.load ()
304
- .limit (5000000 )
305
304
)
306
305
307
306
# filter only the necassary entries
308
- df = df .filter (expr ("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)" ) == str (entity ._id ))
309
- df = df .filter (expr ("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)" ) == str (entity .subject ["id" ]))
307
+ filtered_df = df .filter (expr ("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)" ) == str (entity ._id ))
308
+ filtered_df = filtered_df .filter (expr ("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)" ) == str (entity .subject ["id" ]))
309
+
310
+ # limit the number of records ingested
311
+ limit = job_conf .get ("write_options" , {}).get ("job_limit" , 5000000 )
312
+ filtered_df = filtered_df .limit (limit )
310
313
311
314
# deserialize dataframe so that it can be properly saved
312
- deserialized_df = engine .get_instance ()._deserialize_from_avro (entity , df )
315
+ deserialized_df = engine .get_instance ()._deserialize_from_avro (entity , filtered_df )
313
316
314
317
# insert data
315
318
entity .stream = False # to make sure we dont write to kafka
316
319
entity .insert (deserialized_df , storage = "offline" )
317
320
318
321
# update offsets
319
- df_offsets = df .groupBy ('partition' ).agg (max ('offset' ).alias ('offset' )).collect ()
322
+ df_offsets = ( df if limit > filtered_df . count () else filtered_df ) .groupBy ('partition' ).agg (max ('offset' ).alias ('offset' )).collect ()
320
323
offset_dict = json .loads (offset_string )
321
324
for offset_row in df_offsets :
322
325
offset_dict [f"{ entity ._online_topic_name } " ][f"{ offset_row .partition } " ] = offset_row .offset + 1
0 commit comments