Skip to content

Commit 8aeeb03

Browse files
committed
fix add_cols_to_delta_table
1 parent 44d2f6f commit 8aeeb03

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

python/hsfs/engine/spark.py

+11-9
Original file line numberDiff line numberDiff line change
@@ -1273,16 +1273,18 @@ def save_empty_dataframe(self, feature_group, new_features=None):
12731273
)
12741274

12751275
def add_cols_to_delta_table(self, feature_group, new_features):
1276-
new_features_map = {}
1277-
if isinstance(new_features, list):
1278-
for new_feature in new_features:
1279-
new_features_map[new_feature.name] = lit("").cast(new_feature.type)
1280-
else:
1281-
new_features_map[new_features.name] = lit("").cast(new_features.type)
1282-
1283-
self._spark_session.read.format("delta").load(
1276+
dataframe = self._spark_session.read.format("delta").load(
12841277
feature_group.get_uri()
1285-
).withColumns(new_features_map).limit(0).write.format("delta").mode(
1278+
)
1279+
1280+
if (new_features is not None):
1281+
if isinstance(new_features, list):
1282+
for new_feature in new_features:
1283+
dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
1284+
else:
1285+
dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
1286+
1287+
dataframe.limit(0).write.format("delta").mode(
12861288
"append"
12871289
).option("mergeSchema", "true").option(
12881290
"spark.databricks.delta.schema.autoMerge.enabled", "true"

0 commit comments

Comments
 (0)