Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1537] Managed feature group test #348

Merged
merged 39 commits into from
Oct 2, 2024
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
11ed246
init
bubriks Sep 11, 2024
ec6aa89
add region
bubriks Sep 11, 2024
6bc8e79
fix ruff
bubriks Sep 11, 2024
14363dc
Merge branch 'main' into FSTORE-1537
bubriks Sep 16, 2024
6b66989
temp
bubriks Sep 18, 2024
b3fa7e3
s3 -> s3a
bubriks Sep 18, 2024
07f3a35
time_travel_format
bubriks Sep 18, 2024
5d7dff6
s3 -> s3a
bubriks Sep 18, 2024
23670c7
s3 issue continues
bubriks Sep 18, 2024
8922c81
prepare_spark
bubriks Sep 18, 2024
6b1a53c
path fix?
bubriks Sep 18, 2024
150f836
reconcile_hudi_schema
bubriks Sep 18, 2024
d7ea12f
small undo
bubriks Sep 18, 2024
6db0d1c
remove textFile
bubriks Sep 19, 2024
600b726
register_temporary_table
bubriks Sep 19, 2024
d0738d4
add uri to FG
bubriks Sep 19, 2024
f6da8a3
hive_sync
bubriks Sep 19, 2024
d54c370
fix s3 replace
bubriks Sep 19, 2024
da9d1a4
fix ruff
bubriks Sep 19, 2024
4ea0141
fix some tests
bubriks Sep 19, 2024
91bea60
get_uri()
bubriks Sep 19, 2024
3447281
setup_storage_connector
bubriks Sep 19, 2024
3f048fe
read write from managed FG
bubriks Sep 19, 2024
993649e
Merge branch 'main' into FSTORE-1537
bubriks Sep 19, 2024
700e2a8
fix merge
bubriks Sep 19, 2024
59f2c98
test fix
bubriks Sep 19, 2024
9ddd618
small fixes
bubriks Sep 23, 2024
44d2f6f
fix reconcile and prepare_spark
bubriks Sep 23, 2024
8aeeb03
fix add_cols_to_delta_table
bubriks Sep 23, 2024
d852a34
fix schema evolution hudi
bubriks Sep 24, 2024
c92a1d5
reduce number of get_uri calls
bubriks Sep 24, 2024
4447e3b
get_uri -> prepare_spark_location
bubriks Sep 25, 2024
1cb68ff
Merge branch 'main' into FSTORE-1537
bubriks Sep 25, 2024
0303ad9
add clean fg (#7)
bubriks Sep 30, 2024
2a22b56
Merge branch 'main' into FSTORE-1537
bubriks Oct 1, 2024
626fb12
add tests for clean
bubriks Oct 2, 2024
9ffa117
Merge branch 'managed_feature_group' into FSTORE-1537
bubriks Oct 2, 2024
de7e29a
test_prepare_spark_location
bubriks Oct 2, 2024
abec75e
ruff fix
bubriks Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
temp
  • Loading branch information
bubriks committed Sep 18, 2024
commit 6b669890e92f839d8f65e64aba3d90f2b2a9ea49
25 changes: 15 additions & 10 deletions python/hsfs/constructor/fs_query.py
Original file line number Diff line number Diff line change
@@ -99,16 +99,21 @@ def register_external(
Union[TypeVar("pyspark.sql.DataFrame"), TypeVar("pyspark.RDD")]
] = None,
) -> None:
if self._on_demand_fg_aliases is None:
return

for external_fg_alias in self._on_demand_fg_aliases:
if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup":
external_fg_alias.on_demand_feature_group.dataframe = spine
engine.get_instance().register_external_temporary_table(
external_fg_alias.on_demand_feature_group,
external_fg_alias.alias,
)
if self._on_demand_fg_aliases is not None:
for external_fg_alias in self._on_demand_fg_aliases:
if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup":
external_fg_alias.on_demand_feature_group.dataframe = spine
engine.get_instance().register_external_temporary_table(
external_fg_alias.on_demand_feature_group,
external_fg_alias.alias,
)

if self._hudi_cached_feature_groups is not None:
for external_fg_alias in self._hudi_cached_feature_groups:
engine.get_instance().register_external_temporary_table(
external_fg_alias.feature_group,
external_fg_alias.alias,
)

def register_hudi_tables(
self,
9 changes: 7 additions & 2 deletions python/hsfs/core/hudi_engine.py
Original file line number Diff line number Diff line change
@@ -108,13 +108,18 @@ def register_temporary_table(self, hudi_fg_alias, read_options):
)

def _write_hudi_dataset(self, dataset, save_mode, operation, write_options):
if (self._feature_group.storage_connector is None):
location = self._feature_group.location
else:
location = self._feature_group.storage_connector._get_path(self._feature_group.path)

hudi_options = self._setup_hudi_write_opts(operation, write_options)
dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode(
save_mode
).save(self._feature_group.location)
).save(location)

feature_group_commit = self._get_last_commit_metadata(
self._spark_context, self._feature_group.location
self._spark_context, location
)

return feature_group_commit
9 changes: 8 additions & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
@@ -188,13 +188,20 @@ def set_job_group(self, group_id, description):
self._spark_session.sparkContext.setJobGroup(group_id, description)

def register_external_temporary_table(self, external_fg, alias):
if not isinstance(external_fg, fg_mod.SpineGroup):
if isinstance(external_fg, fg_mod.ExternalFeatureGroup):
external_dataset = external_fg.storage_connector.read(
external_fg.query,
external_fg.data_format,
external_fg.options,
external_fg.storage_connector._get_path(external_fg.path),
)
elif isinstance(external_fg, fg_mod.FeatureGroup):
external_dataset = external_fg.storage_connector.read(
None,
external_fg.timeTravelFormat,
None,
external_fg.storage_connector._get_path(external_fg.path),
)
else:
external_dataset = external_fg.dataframe
if external_fg.location: