From 007d4b428e8b89e38ac14f78f85cc779843ed975 Mon Sep 17 00:00:00 2001 From: kenneth Date: Thu, 12 Sep 2024 16:12:55 +0200 Subject: [PATCH 1/8] clean up add DeltaCachedFeatureGroups read delta s3 python read write delta s3 add delta to dev add delta read delta on s3 --- python/hsfs/constructor/fs_query.py | 11 ++++++++++- python/hsfs/core/arrow_flight_client.py | 26 +++++++++++++++++++++++-- python/hsfs/core/delta_engine.py | 2 ++ python/hsfs/engine/python.py | 3 +++ python/hsfs/feature_group.py | 3 +-- python/hsfs/storage_connector.py | 10 ++++++++++ python/pyproject.toml | 2 ++ 7 files changed, 52 insertions(+), 5 deletions(-) diff --git a/python/hsfs/constructor/fs_query.py b/python/hsfs/constructor/fs_query.py index b944006f3..b367d5b13 100644 --- a/python/hsfs/constructor/fs_query.py +++ b/python/hsfs/constructor/fs_query.py @@ -35,6 +35,7 @@ def __init__( expand: Optional[List[str]] = None, items: Optional[List[Dict[str, Any]]] = None, type: Optional[str] = None, + delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None, **kwargs, ) -> None: self._query = query @@ -60,6 +61,14 @@ def __init__( else: self._hudi_cached_feature_groups = [] + if delta_cached_feature_groups is not None: + self._delta_cached_feature_groups = [ + hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg) + for fg in delta_cached_feature_groups + ] + else: + self._delta_cached_feature_groups = [] + @classmethod def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery": json_decamelized = humps.decamelize(json_dict) @@ -127,7 +136,7 @@ def register_delta_tables( feature_store_name: str, read_options: Optional[Dict[str, Any]], ) -> None: - for hudi_fg in self._hudi_cached_feature_groups: + for hudi_fg in self._delta_cached_feature_groups: engine.get_instance().register_delta_temporary_table( hudi_fg, feature_store_id, feature_store_name, read_options ) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 53bb96a86..133e75093 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -124,7 +124,13 @@ def _is_query_supported_rec(query: query.Query): and query._left_feature_group.storage_connector.type in ArrowFlightClient.SUPPORTED_EXTERNAL_CONNECTORS ) - supported = hudi_no_time_travel or supported_connector + delta_s3 = ( + isinstance(query._left_feature_group, feature_group.FeatureGroup) + and query._left_feature_group.time_travel_format == "DELTA" + and query._left_feature_group.storage_connector + and query._left_feature_group.storage_connector.type == StorageConnector.S3 + ) + supported = hudi_no_time_travel or supported_connector or delta_s3 for j in query._joins: supported &= _is_query_supported_rec(j._query) return supported @@ -549,6 +555,7 @@ def enabled_on_cluster(self) -> bool: def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases): connector = {} if isinstance(fg, feature_group.ExternalFeatureGroup): + connector["time_travel_type"] = None connector["type"] = fg.storage_connector.type connector["options"] = fg.storage_connector.connector_options() connector["query"] = fg.query[:-1] if fg.query.endswith(";") else fg.query @@ -566,8 +573,23 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases): connector["filters"] = _serialize_filter_expression( join_obj._query._filter, join_obj._query, True ) + elif fg.time_travel_format == "DELTA": + connector["time_travel_type"] = "delta" + connector["type"] = fg.storage_connector.type + connector["options"] = fg.storage_connector.connector_options() + connector["query"] = "" + if query._left_feature_group == fg: + connector["filters"] = _serialize_filter_expression( + query._filter, query, True + ) + else: + for join_obj in query._joins: + if join_obj._query._left_feature_group == fg: + connector["filters"] = _serialize_filter_expression( + join_obj._query._filter, join_obj._query, True + ) else: - connector["type"] = "hudi" + connector["time_travel_type"] = "hudi" return connector diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 0032daddf..ebd80e84e 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -15,6 +15,8 @@ # from __future__ import annotations +import os + from hopsworks_common.client.exceptions import FeatureStoreException from hsfs import feature_group_commit, util from hsfs.core import feature_group_api diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 512803ebd..d6877bca5 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -504,6 +504,9 @@ def register_hudi_temporary_table( + "environment with Spark Engine." ) + def register_delta_temporary_table(*args, **kwargs): + pass + def profile_by_spark( self, metadata_instance: Union[ diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index f47f1d962..3809fca91 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -148,7 +148,7 @@ def __init__( self._name = name self.event_time = event_time self._online_enabled = online_enabled - self._location = location + self._location = location.replace("hopsfs://", "hdfs://") if location else location # TODO ken: revert it self._id = id self._subject = None self._online_topic_name = online_topic_name @@ -166,7 +166,6 @@ def __init__( ) else: self._storage_connector: "sc.StorageConnector" = storage_connector - self._online_config = ( OnlineConfig.from_response_json(online_config) if isinstance(online_config, dict) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index c7ba70dc2..e6015cad4 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -369,6 +369,16 @@ def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: """ return engine.get_instance().setup_storage_connector(self, path) + def connector_options(self) -> Dict[str, Any]: + """Return options to be passed to an external S3 connector library""" + return { + "access_key": self.access_key, + "secret_key": self.secret_key, + "session_token": self.session_token, + "path": self.path, + "region": "us-east-2", + } + def read( self, query: Optional[str] = None, diff --git a/python/pyproject.toml b/python/pyproject.toml index 6cd64077e..5efacb180 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -74,6 +74,7 @@ dev-no-opt = [ "pyspark==3.1.1", "moto[s3]==5.0.0", "typeguard==4.2.1", + "delta-spark==1.0.1" ] dev-pandas1 = [ "hopsworks[python]", @@ -84,6 +85,7 @@ dev-pandas1 = [ "moto[s3]==5.0.0", "pandas<=1.5.3", "sqlalchemy<=1.4.48", + "delta-spark==1.0.1" ] dev = ["hopsworks[dev-no-opt,great-expectations,polars]"] polars=["polars>=0.20.18,<=0.21.0"] From cf91b53e31e298a462c7fd7ce3803bc93f4e67ac Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 13:05:50 +0200 Subject: [PATCH 2/8] clean up --- python/hsfs/core/delta_engine.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index ebd80e84e..0032daddf 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -15,8 +15,6 @@ # from __future__ import annotations -import os - from hopsworks_common.client.exceptions import FeatureStoreException from hsfs import feature_group_commit, util from hsfs.core import feature_group_api From 605367c5eeffc8b530726180aa99296866935cd2 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 13:06:36 +0200 Subject: [PATCH 3/8] clean up --- python/hsfs/feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 3809fca91..ac39503cc 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -148,7 +148,7 @@ def __init__( self._name = name self.event_time = event_time self._online_enabled = online_enabled - self._location = location.replace("hopsfs://", "hdfs://") if location else location # TODO ken: revert it + self._location = location self._id = id self._subject = None self._online_topic_name = online_topic_name From 0e04d94456e75d30875a1490cddb7297dfbcb7ca Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 13:35:38 +0200 Subject: [PATCH 4/8] fix style --- python/hsfs/engine/python.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index d6877bca5..512803ebd 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -504,9 +504,6 @@ def register_hudi_temporary_table( + "environment with Spark Engine." ) - def register_delta_temporary_table(*args, **kwargs): - pass - def profile_by_spark( self, metadata_instance: Union[ From 375a2d3ef3ff887088410ae7dfc5bfbc591b5978 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 14:03:53 +0200 Subject: [PATCH 5/8] fix test --- python/tests/core/test_arrow_flight_client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 0f480d12f..e67006d67 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -252,7 +252,7 @@ def test_construct_query_object(self, mocker, backend_fixtures): "right_filter": None, }, }, - "connectors": {"test.fg_test_1": {"type": "hudi"}}, + "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, } query_object["features"] = { @@ -293,7 +293,7 @@ def test_construct_query_object_datetime_filter(self, mocker, backend_fixtures): }, "right_filter": None, }, - "connectors": {"test.fg_test_1": {"type": "hudi"}}, + "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, } query_object["features"] = { @@ -331,7 +331,7 @@ def test_construct_query_object_without_fs(self, mocker, backend_fixtures): }, "right_filter": None, }, - "connectors": {"test.fg_test_1": {"type": "hudi"}}, + "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, } query_object["features"] = { @@ -369,7 +369,7 @@ def test_construct_query_object_without_fs_excluded(self, mocker, backend_fixtur }, "right_filter": None, }, - "connectors": {"test.fg_test_1": {"type": "hudi"}}, + "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, } query_object["features"] = { @@ -430,7 +430,8 @@ def test_construct_query_object_snowflake(self, mocker, backend_fixtures): }, "connectors": { "test.tpch1snowflake_1": { - "type": "SNOWFLAKE", + "time_travel_type": None, + "type": 'SNOWFLAKE', "options": { "user": "test_user", "account": "test_url", From 1d4874c15211d2c374b2428358ec58f777b3fa68 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 14:05:14 +0200 Subject: [PATCH 6/8] fix region --- python/hsfs/storage_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index e6015cad4..0eaa58275 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -376,7 +376,7 @@ def connector_options(self) -> Dict[str, Any]: "secret_key": self.secret_key, "session_token": self.session_token, "path": self.path, - "region": "us-east-2", + "region": self.region, } def read( From 390a8ec013d8e40d5e6c9d20892c99a50453f0b5 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 14:24:24 +0200 Subject: [PATCH 7/8] fix s3 path --- python/hsfs/core/arrow_flight_client.py | 2 ++ python/hsfs/storage_connector.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 133e75093..21a642369 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -577,6 +577,8 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases): connector["time_travel_type"] = "delta" connector["type"] = fg.storage_connector.type connector["options"] = fg.storage_connector.connector_options() + if fg.storage_connector.type == StorageConnector.S3: + connector["path"] = fg.location connector["query"] = "" if query._left_feature_group == fg: connector["filters"] = _serialize_filter_expression( diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 0eaa58275..5f9d8c532 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -375,7 +375,6 @@ def connector_options(self) -> Dict[str, Any]: "access_key": self.access_key, "secret_key": self.secret_key, "session_token": self.session_token, - "path": self.path, "region": self.region, } From 5df39cff875fced945608e794648776845c8e0f6 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 1 Oct 2024 15:29:22 +0200 Subject: [PATCH 8/8] fix s3 path --- python/hsfs/core/arrow_flight_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 21a642369..d4a4602a8 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -578,7 +578,7 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases): connector["type"] = fg.storage_connector.type connector["options"] = fg.storage_connector.connector_options() if fg.storage_connector.type == StorageConnector.S3: - connector["path"] = fg.location + connector["options"]["path"] = fg.location connector["query"] = "" if query._left_feature_group == fg: connector["filters"] = _serialize_filter_expression(