Skip to content

Commit 9ddd618

Browse files
authored
small fixes
* tmp1 * tmp 2 * tmp 3 * prepare_spark
1 parent 59f2c98 commit 9ddd618

File tree

3 files changed

+13
-25
lines changed

3 files changed

+13
-25
lines changed

python/hsfs/constructor/fs_query.py

+10-15
Original file line numberDiff line numberDiff line change
@@ -99,21 +99,16 @@ def register_external(
9999
Union[TypeVar("pyspark.sql.DataFrame"), TypeVar("pyspark.RDD")]
100100
] = None,
101101
) -> None:
102-
if self._on_demand_fg_aliases is not None:
103-
for external_fg_alias in self._on_demand_fg_aliases:
104-
if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup":
105-
external_fg_alias.on_demand_feature_group.dataframe = spine
106-
engine.get_instance().register_external_temporary_table(
107-
external_fg_alias.on_demand_feature_group,
108-
external_fg_alias.alias,
109-
)
110-
111-
if self._hudi_cached_feature_groups is not None:
112-
for external_fg_alias in self._hudi_cached_feature_groups:
113-
engine.get_instance().register_external_temporary_table(
114-
external_fg_alias.feature_group,
115-
external_fg_alias.alias,
116-
)
102+
if self._on_demand_fg_aliases is None:
103+
return
104+
105+
for external_fg_alias in self._on_demand_fg_aliases:
106+
if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup":
107+
external_fg_alias.on_demand_feature_group.dataframe = spine
108+
engine.get_instance().register_external_temporary_table(
109+
external_fg_alias.on_demand_feature_group,
110+
external_fg_alias.alias,
111+
)
117112

118113
def register_hudi_tables(
119114
self,

python/hsfs/engine/spark.py

+2-9
Original file line numberDiff line numberDiff line change
@@ -188,22 +188,15 @@ def set_job_group(self, group_id, description):
188188
self._spark_session.sparkContext.setJobGroup(group_id, description)
189189

190190
def register_external_temporary_table(self, external_fg, alias):
191-
if isinstance(external_fg, fg_mod.ExternalFeatureGroup):
191+
if not isinstance(external_fg, fg_mod.SpineGroup):
192192
external_dataset = external_fg.storage_connector.read(
193193
external_fg.query,
194194
external_fg.data_format,
195195
external_fg.options,
196196
external_fg.get_uri(),
197197
)
198-
elif isinstance(external_fg, fg_mod.SpineGroup):
199-
external_dataset = external_fg.dataframe
200198
else:
201-
external_dataset = external_fg.storage_connector.read(
202-
None,
203-
external_fg.time_travel_format,
204-
None,
205-
external_fg.get_uri(),
206-
)
199+
external_dataset = external_fg.dataframe
207200
if external_fg.location:
208201
self._spark_session.sparkContext.textFile(external_fg.location).collect()
209202

python/hsfs/feature_group.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2074,7 +2074,7 @@ def get_uri(self) -> str:
20742074
else:
20752075
path = self.storage_connector._get_path(self.path)
20762076
if engine.get_type().startswith("spark"):
2077-
path = engine.get_instance().setup_storage_connector(self.storage_connector, path)
2077+
path = self.storage_connector.prepare_spark(path)
20782078
return path
20792079

20802080
@property

0 commit comments

Comments
 (0)