Skip to content

Commit 0e92376

Browse files
committed
save offsets?
1 parent c744892 commit 0e92376

File tree

3 files changed

+35
-4
lines changed

3 files changed

+35
-4
lines changed

python/hsfs/core/kafka_engine.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def kafka_get_offsets(
141141
offsets += f",{partition_metadata.id}:{consumer.get_watermark_offsets(partition)[tuple_value]}"
142142
consumer.close()
143143

144-
return f" -initialCheckPointString {topic_name + offsets}"
144+
return f"{topic_name + offsets}"
145145
return ""
146146

147147

python/hsfs/engine/python.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,7 @@ def _write_dataframe_kafka(
14011401
now = datetime.now(timezone.utc)
14021402
feature_group.materialization_job.run(
14031403
args=feature_group.materialization_job.config.get("defaultArgs", "")
1404-
+ initial_check_point,
1404+
+ (f" -initialCheckPointString {initial_check_point}" if initial_check_point else ""),
14051405
await_termination=offline_write_options.get("wait_for_job", False),
14061406
)
14071407
offline_backfill_every_hr = offline_write_options.pop(

utils/python/hsfs_utils.py

+33-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
hopsfs = pfs.HadoopFileSystem("default", user=os.environ["HADOOP_USER_NAME"])
1414
from pyspark.sql import SparkSession
1515
from pyspark.sql.types import StructField, StructType, _parse_datatype_string
16+
from pyspark.sql.functions import max
1617

1718
import hopsworks
1819

@@ -272,20 +273,50 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
272273
entity.feature_store_id, {}, engine="spark"
273274
)
274275

276+
# get offsets
277+
offset_location = entity.location + "_offsets"
278+
try:
279+
if initial_check_point_string:
280+
offset_string = json.dumps(_build_starting_offsets(initial_check_point_string))
281+
else:
282+
offset_string = spark.read.json(offset_location).toJSON().first()
283+
except Exception as e:
284+
print(f"An unexpected error occurred: {e}")
285+
# if all else fails read from the beggining
286+
initial_check_point_string = kafka_engine.kafka_get_offsets(
287+
topic_name=entity._online_topic_name,
288+
feature_store_id=entity.feature_store_id,
289+
high=False,
290+
)
291+
offset_string = json.dumps(_build_starting_offsets(initial_check_point_string))
292+
293+
# read kafka topic
275294
df = (
276295
spark.read.format("kafka")
277296
.options(**read_options)
278297
.option("subscribe", entity._online_topic_name)
279-
.option("startingOffsets", _build_starting_offsets(initial_check_point_string)) \
298+
.option("startingOffsets", offset_string)
299+
.limit(5000000)
280300
.load()
281301
)
282302

283303
# deserialize dataframe so that it can be properly saved
284304
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df)
285305

306+
# insert data
286307
entity.stream = False # to make sure we dont write to kafka
287308
entity.insert(deserialized_df)
288309

310+
# update offsets
311+
df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect()
312+
offset_dict = json.loads(offset_string)
313+
for offset_row in df_offsets:
314+
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset
315+
316+
# save offsets
317+
offset_df = spark.createDataFrame([offset_dict])
318+
offset_df.write.mode("overwrite").json(offset_location)
319+
289320
def _build_starting_offsets(initial_check_point_string: str):
290321
if not initial_check_point_string:
291322
return ""
@@ -299,7 +330,7 @@ def _build_starting_offsets(initial_check_point_string: str):
299330
# Create the final dictionary structure
300331
result = {topic: offsets_dict}
301332

302-
return json.dumps(result)
333+
return result
303334

304335
if __name__ == "__main__":
305336
# Setup spark first so it fails faster in case of args errors

0 commit comments

Comments
 (0)