Skip to content

Commit 5a7c938

Browse files
committed
small fix
1 parent aba04f7 commit 5a7c938

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

utils/python/hsfs_utils.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
286286
initial_check_point_string = kafka_engine.kafka_get_offsets(
287287
topic_name=entity._online_topic_name,
288288
feature_store_id=entity.feature_store_id,
289+
offline_write_options={},
289290
high=False,
290291
)
291292
offset_string = json.dumps(_build_starting_offsets(initial_check_point_string))
@@ -316,11 +317,11 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
316317
df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect()
317318
offset_dict = json.loads(offset_string)
318319
for offset_row in df_offsets:
319-
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset
320+
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1
320321

321322
# save offsets
322323
offset_df = spark.createDataFrame([offset_dict])
323-
offset_df.write.mode("overwrite").json(offset_location)
324+
offset_df.coalesce(1).write.mode("overwrite").json(offset_location)
324325

325326
def _build_starting_offsets(initial_check_point_string: str):
326327
if not initial_check_point_string:

0 commit comments

Comments
 (0)