Skip to content

Commit 91bea60

Browse files
committed
get_uri()
1 parent 4ea0141 commit 91bea60

File tree

4 files changed

+16
-17
lines changed

4 files changed

+16
-17
lines changed

python/hsfs/core/delta_engine.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def register_temporary_table(self, delta_fg_alias, read_options):
5555
delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options)
5656
self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options(
5757
**delta_options
58-
).load(self._feature_group.uri).createOrReplaceTempView(
58+
).load(self._feature_group.get_uri()).createOrReplaceTempView(
5959
delta_fg_alias.alias
6060
)
6161

@@ -86,14 +86,14 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options):
8686

8787
def delete_record(self, delete_df):
8888
if not DeltaTable.isDeltaTable(
89-
self._spark_session, self._feature_group.uri
89+
self._spark_session, self._feature_group.get_uri()
9090
):
9191
raise FeatureStoreException(
9292
f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled "
9393
)
9494
else:
9595
fg_source_table = DeltaTable.forPath(
96-
self._spark_session, self._feature_group.uri
96+
self._spark_session, self._feature_group.get_uri()
9797
)
9898

9999
source_alias = (
@@ -109,7 +109,7 @@ def delete_record(self, delete_df):
109109
).whenMatchedDelete().execute()
110110

111111
fg_commit = self._get_last_commit_metadata(
112-
self._spark_session, self._feature_group.uri
112+
self._spark_session, self._feature_group.get_uri()
113113
)
114114
return self._feature_group_api.commit(self._feature_group, fg_commit)
115115

@@ -118,7 +118,7 @@ def _write_delta_dataset(self, dataset, write_options):
118118
write_options = {}
119119

120120
if not DeltaTable.isDeltaTable(
121-
self._spark_session, self._feature_group.uri
121+
self._spark_session, self._feature_group.get_uri()
122122
):
123123
(
124124
dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT)
@@ -129,11 +129,11 @@ def _write_delta_dataset(self, dataset, write_options):
129129
else []
130130
)
131131
.mode("append")
132-
.save(self._feature_group.uri)
132+
.save(self._feature_group.get_uri())
133133
)
134134
else:
135135
fg_source_table = DeltaTable.forPath(
136-
self._spark_session, self._feature_group.uri
136+
self._spark_session, self._feature_group.get_uri()
137137
)
138138

139139
source_alias = (
@@ -149,7 +149,7 @@ def _write_delta_dataset(self, dataset, write_options):
149149
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
150150

151151
return self._get_last_commit_metadata(
152-
self._spark_session, self._feature_group.uri
152+
self._spark_session, self._feature_group.get_uri()
153153
)
154154

155155
def _generate_merge_query(self, source_alias, updates_alias):

python/hsfs/core/hudi_engine.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,18 @@ def register_temporary_table(self, hudi_fg_alias, read_options):
103103
hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options)
104104
self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options(
105105
**hudi_options
106-
).load(self._feature_group.uri).createOrReplaceTempView(
106+
).load(self._feature_group.get_uri()).createOrReplaceTempView(
107107
hudi_fg_alias.alias
108108
)
109109

110110
def _write_hudi_dataset(self, dataset, save_mode, operation, write_options):
111111
hudi_options = self._setup_hudi_write_opts(operation, write_options)
112112
dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode(
113113
save_mode
114-
).save(self._feature_group.uri)
114+
).save(self._feature_group.get_uri())
115115

116116
feature_group_commit = self._get_last_commit_metadata(
117-
self._spark_context, self._feature_group.uri
117+
self._spark_context, self._feature_group.get_uri()
118118
)
119119

120120
return feature_group_commit

python/hsfs/engine/spark.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def register_external_temporary_table(self, external_fg, alias):
193193
external_fg.query,
194194
external_fg.data_format,
195195
external_fg.options,
196-
external_fg.uri,
196+
external_fg.get_uri(),
197197
)
198198
elif isinstance(external_fg, fg_mod.SpineGroup):
199199
external_dataset = external_fg.dataframe
@@ -202,7 +202,7 @@ def register_external_temporary_table(self, external_fg, alias):
202202
None,
203203
external_fg.time_travel_format,
204204
None,
205-
external_fg.uri,
205+
external_fg.get_uri(),
206206
)
207207
if external_fg.location:
208208
self._spark_session.sparkContext.textFile(external_fg.location).collect()
@@ -1267,12 +1267,12 @@ def add_cols_to_delta_table(self, feature_group, new_features):
12671267
new_features_map[new_features.name] = lit("").cast(new_features.type)
12681268

12691269
self._spark_session.read.format("delta").load(
1270-
feature_group.uri
1270+
feature_group.get_uri()
12711271
).withColumns(new_features_map).limit(0).write.format("delta").mode(
12721272
"append"
12731273
).option("mergeSchema", "true").option(
12741274
"spark.databricks.delta.schema.autoMerge.enabled", "true"
1275-
).save(feature_group.uri)
1275+
).save(feature_group.get_uri())
12761276

12771277
def _apply_transformation_function(
12781278
self,

python/hsfs/feature_group.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -2064,8 +2064,7 @@ def path(self) -> Optional[str]:
20642064
def storage_connector(self) -> "sc.StorageConnector":
20652065
return self._storage_connector
20662066

2067-
@property
2068-
def uri(self) -> str:
2067+
def get_uri(self) -> str:
20692068
"""Location of data."""
20702069
if (self.storage_connector is None):
20712070
return self.location

0 commit comments

Comments
 (0)