Skip to content

Commit 714981e

Browse files
committedOct 18, 2024
update to append_features
1 parent dea2e25 commit 714981e

File tree

3 files changed

+12
-21
lines changed

3 files changed

+12
-21
lines changed
 

‎python/hsfs/core/feature_group_engine.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,9 @@ def append_features(self, feature_group, new_features):
297297

298298
# write empty dataframe to update parquet schema
299299
if feature_group.time_travel_format == "DELTA":
300-
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
300+
engine.get_instance().add_cols_to_delta_table(feature_group)
301301
else:
302-
engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
302+
engine.get_instance().save_empty_dataframe(feature_group)
303303

304304
def update_description(self, feature_group, description):
305305
"""Updates the description of a feature group."""

‎python/hsfs/engine/python.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -1212,13 +1212,11 @@ def save_stream_dataframe(
12121212
"Stream ingestion is not available on Python environments, because it requires Spark as engine."
12131213
)
12141214

1215-
def save_empty_dataframe(
1216-
self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None
1217-
) -> None:
1215+
def save_empty_dataframe(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None:
12181216
"""Wrapper around save_dataframe in order to provide no-op."""
12191217
pass
12201218

1221-
def add_cols_to_delta_table(self, feature_group: FeatureGroup, new_features) -> None:
1219+
def add_cols_to_delta_table(self, feature_group: FeatureGroup) -> None:
12221220
"""Wrapper around add_cols_to_delta_table in order to provide no-op."""
12231221
pass
12241222

‎python/hsfs/engine/spark.py

+8-15
Original file line numberDiff line numberDiff line change
@@ -1324,18 +1324,14 @@ def is_spark_dataframe(self, dataframe):
13241324
return True
13251325
return False
13261326

1327-
def save_empty_dataframe(self, feature_group, new_features=None):
1327+
def save_empty_dataframe(self, feature_group):
13281328
location = feature_group.prepare_spark_location()
13291329

13301330
dataframe = self._spark_session.read.format("hudi").load(location)
13311331

1332-
if (new_features is not None):
1333-
if isinstance(new_features, list):
1334-
for new_feature in new_features:
1335-
dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type))
1336-
else:
1337-
dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type))
1338-
1332+
for feature in feature_group.features:
1333+
if feature.name not in dataframe.columns:
1334+
dataframe = dataframe.withColumn(feature.name, lit(None).cast(feature.type))
13391335

13401336
self.save_dataframe(
13411337
feature_group,
@@ -1347,17 +1343,14 @@ def save_empty_dataframe(self, feature_group, new_features=None):
13471343
{},
13481344
)
13491345

1350-
def add_cols_to_delta_table(self, feature_group, new_features):
1346+
def add_cols_to_delta_table(self, feature_group):
13511347
location = feature_group.prepare_spark_location()
13521348

13531349
dataframe = self._spark_session.read.format("delta").load(location)
13541350

1355-
if (new_features is not None):
1356-
if isinstance(new_features, list):
1357-
for new_feature in new_features:
1358-
dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
1359-
else:
1360-
dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
1351+
for feature in feature_group.features:
1352+
if feature.name not in dataframe.columns:
1353+
dataframe = dataframe.withColumn(feature.name, lit(None).cast(feature.type))
13611354

13621355
dataframe.limit(0).write.format("delta").mode(
13631356
"append"

0 commit comments

Comments
 (0)