Skip to content

Commit 44d2f6f

Browse files
authored
fix reconcile and prepare_spark
1 parent 9ddd618 commit 44d2f6f

File tree

6 files changed

+33
-14
lines changed

6 files changed

+33
-14
lines changed

python/hsfs/core/feature_group_engine.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def append_features(self, feature_group, new_features):
285285
if feature_group.time_travel_format == "DELTA":
286286
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
287287
else:
288-
engine.get_instance().save_empty_dataframe(feature_group)
288+
engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
289289

290290
def update_description(self, feature_group, description):
291291
"""Updates the description of a feature group."""

python/hsfs/core/hudi_engine.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121

2222
class HudiEngine:
23+
24+
HUDI_SPEC_FEATURE_NAMES = ["_hoodie_record_key", "_hoodie_partition_path",
25+
"_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno"]
26+
2327
HUDI_SPARK_FORMAT = "org.apache.hudi"
2428
HUDI_TABLE_NAME = "hoodie.table.name"
2529
HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type"
@@ -229,9 +233,8 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options):
229233
def reconcile_hudi_schema(
230234
self, save_empty_dataframe_callback, hudi_fg_alias, read_options
231235
):
232-
fg_table_name = hudi_fg_alias.feature_group._get_table_name()
233236
if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted(
234-
self._spark_session.table(fg_table_name).columns
237+
[feature.name for feature in hudi_fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES
235238
):
236239
full_fg = self._feature_group_api.get(
237240
feature_store_id=hudi_fg_alias.feature_group._feature_store_id,

python/hsfs/engine/python.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1182,7 +1182,7 @@ def save_stream_dataframe(
11821182
)
11831183

11841184
def save_empty_dataframe(
1185-
self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]
1185+
self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None
11861186
) -> None:
11871187
"""Wrapper around save_dataframe in order to provide no-op."""
11881188
pass

python/hsfs/engine/spark.py

+16-8
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,9 @@ def register_hudi_temporary_table(
219219
read_options,
220220
)
221221

222-
if (hudi_fg_alias._feature_group.storage_connector is None):
223-
hudi_engine_instance.reconcile_hudi_schema(
224-
self.save_empty_dataframe, hudi_fg_alias, read_options
225-
)
222+
hudi_engine_instance.reconcile_hudi_schema(
223+
self.save_empty_dataframe, hudi_fg_alias, read_options
224+
)
226225

227226
def register_delta_temporary_table(
228227
self, delta_fg_alias, feature_store_id, feature_store_name, read_options
@@ -1250,13 +1249,22 @@ def is_spark_dataframe(self, dataframe):
12501249
return True
12511250
return False
12521251

1253-
def save_empty_dataframe(self, feature_group):
1254-
fg_table_name = feature_group._get_table_name()
1255-
dataframe = self._spark_session.table(fg_table_name).limit(0)
1252+
def save_empty_dataframe(self, feature_group, new_features=None):
1253+
dataframe = self._spark_session.read.format("hudi").load(
1254+
feature_group.get_uri()
1255+
)
1256+
1257+
if (new_features is not None):
1258+
if isinstance(new_features, list):
1259+
for new_feature in new_features:
1260+
dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
1261+
else:
1262+
dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
1263+
12561264

12571265
self.save_dataframe(
12581266
feature_group,
1259-
dataframe,
1267+
dataframe.limit(0),
12601268
"upsert",
12611269
feature_group.online_enabled,
12621270
"offline",

python/hsfs/storage_connector.py

+6
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ def description(self) -> Optional[str]:
134134
def spark_options(self) -> None:
135135
pass
136136

137+
def prepare_spark(self, path: Optional[str] = None) -> Optional[str]:
138+
_logger.info(
139+
"This Storage Connector cannot be prepare for Spark."
140+
)
141+
return path
142+
137143
def read(
138144
self,
139145
query: Optional[str] = None,

python/tests/engine/test_spark.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -4559,7 +4559,9 @@ def test_save_empty_dataframe(self, mocker):
45594559
mock_spark_engine_save_dataframe = mocker.patch(
45604560
"hsfs.engine.spark.Engine.save_dataframe"
45614561
)
4562-
mock_spark_table = mocker.patch("pyspark.sql.session.SparkSession.table")
4562+
mock_spark_read = mocker.patch("pyspark.sql.SparkSession.read")
4563+
mock_format = mocker.Mock()
4564+
mock_spark_read.format.return_value = mock_format
45634565

45644566
# Arrange
45654567
spark_engine = spark.Engine()
@@ -4579,7 +4581,7 @@ def test_save_empty_dataframe(self, mocker):
45794581

45804582
# Assert
45814583
assert mock_spark_engine_save_dataframe.call_count == 1
4582-
assert mock_spark_table.call_count == 1
4584+
assert mock_spark_read.format.call_count == 1
45834585

45844586
def test_apply_transformation_function_single_output(self, mocker):
45854587
# Arrange

0 commit comments

Comments
 (0)