Skip to content

Commit c744892

Browse files
committed
initial_check_point_string
1 parent 97d6e35 commit c744892

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

utils/python/hsfs_utils.py

+23-2
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
259259

260260
entity.delta_vacuum()
261261

262-
def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
262+
def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], initial_check_point_string: str) -> None:
263263
"""
264264
Run materialization job on a feature group.
265265
"""
@@ -276,6 +276,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) ->
276276
spark.read.format("kafka")
277277
.options(**read_options)
278278
.option("subscribe", entity._online_topic_name)
279+
.option("startingOffsets", _build_starting_offsets(initial_check_point_string)) \
279280
.load()
280281
)
281282

@@ -285,6 +286,20 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) ->
285286
entity.stream = False # to make sure we dont write to kafka
286287
entity.insert(deserialized_df)
287288

289+
def _build_starting_offsets(initial_check_point_string: str):
290+
if not initial_check_point_string:
291+
return ""
292+
293+
# Split the input string into the topic and partition-offset pairs
294+
topic, offsets = initial_check_point_string.split(',', 1)
295+
296+
# Split the offsets and build a dictionary from them
297+
offsets_dict = {partition: int(offset) for partition, offset in (pair.split(':') for pair in offsets.split(','))}
298+
299+
# Create the final dictionary structure
300+
result = {topic: offsets_dict}
301+
302+
return json.dumps(result)
288303

289304
if __name__ == "__main__":
290305
# Setup spark first so it fails faster in case of args errors
@@ -325,6 +340,12 @@ def parse_isoformat_date(da: str) -> datetime:
325340
help="Job start time",
326341
)
327342

343+
parser.add_argument(
344+
"-initialCheckPointString",
345+
type=str,
346+
help="Kafka offset to start consuming from",
347+
)
348+
328349
args = parser.parse_args()
329350
job_conf = read_job_conf(args.path)
330351

@@ -347,7 +368,7 @@ def parse_isoformat_date(da: str) -> datetime:
347368
elif args.op == "delta_vacuum_fg":
348369
delta_vacuum_fg(spark, job_conf)
349370
elif args.op == "offline_fg_materialization":
350-
offline_fg_materialization(spark, job_conf)
371+
offline_fg_materialization(spark, job_conf, args.initialCheckPointString)
351372

352373
success = True
353374
except Exception:

0 commit comments

Comments
 (0)