Skip to content

Commit 7de5a12

Browse files
authored
[FSTORE-1536] Support delta table in python client (#347)
* clean up add DeltaCachedFeatureGroups read delta s3 python read write delta s3 add delta to dev add delta read delta on s3 * clean up * clean up * fix style * fix test * fix region * fix s3 path * fix s3 path
1 parent aa1fd95 commit 7de5a12

File tree

6 files changed

+53
-9
lines changed

6 files changed

+53
-9
lines changed

python/hsfs/constructor/fs_query.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
expand: Optional[List[str]] = None,
3636
items: Optional[List[Dict[str, Any]]] = None,
3737
type: Optional[str] = None,
38+
delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None,
3839
**kwargs,
3940
) -> None:
4041
self._query = query
@@ -60,6 +61,14 @@ def __init__(
6061
else:
6162
self._hudi_cached_feature_groups = []
6263

64+
if delta_cached_feature_groups is not None:
65+
self._delta_cached_feature_groups = [
66+
hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg)
67+
for fg in delta_cached_feature_groups
68+
]
69+
else:
70+
self._delta_cached_feature_groups = []
71+
6372
@classmethod
6473
def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery":
6574
json_decamelized = humps.decamelize(json_dict)
@@ -127,7 +136,7 @@ def register_delta_tables(
127136
feature_store_name: str,
128137
read_options: Optional[Dict[str, Any]],
129138
) -> None:
130-
for hudi_fg in self._hudi_cached_feature_groups:
139+
for hudi_fg in self._delta_cached_feature_groups:
131140
engine.get_instance().register_delta_temporary_table(
132141
hudi_fg, feature_store_id, feature_store_name, read_options
133142
)

python/hsfs/core/arrow_flight_client.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,13 @@ def _is_query_supported_rec(query: query.Query):
124124
and query._left_feature_group.storage_connector.type
125125
in ArrowFlightClient.SUPPORTED_EXTERNAL_CONNECTORS
126126
)
127-
supported = hudi_no_time_travel or supported_connector
127+
delta_s3 = (
128+
isinstance(query._left_feature_group, feature_group.FeatureGroup)
129+
and query._left_feature_group.time_travel_format == "DELTA"
130+
and query._left_feature_group.storage_connector
131+
and query._left_feature_group.storage_connector.type == StorageConnector.S3
132+
)
133+
supported = hudi_no_time_travel or supported_connector or delta_s3
128134
for j in query._joins:
129135
supported &= _is_query_supported_rec(j._query)
130136
return supported
@@ -549,6 +555,7 @@ def enabled_on_cluster(self) -> bool:
549555
def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
550556
connector = {}
551557
if isinstance(fg, feature_group.ExternalFeatureGroup):
558+
connector["time_travel_type"] = None
552559
connector["type"] = fg.storage_connector.type
553560
connector["options"] = fg.storage_connector.connector_options()
554561
connector["query"] = fg.query[:-1] if fg.query.endswith(";") else fg.query
@@ -566,8 +573,25 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
566573
connector["filters"] = _serialize_filter_expression(
567574
join_obj._query._filter, join_obj._query, True
568575
)
576+
elif fg.time_travel_format == "DELTA":
577+
connector["time_travel_type"] = "delta"
578+
connector["type"] = fg.storage_connector.type
579+
connector["options"] = fg.storage_connector.connector_options()
580+
if fg.storage_connector.type == StorageConnector.S3:
581+
connector["options"]["path"] = fg.location
582+
connector["query"] = ""
583+
if query._left_feature_group == fg:
584+
connector["filters"] = _serialize_filter_expression(
585+
query._filter, query, True
586+
)
587+
else:
588+
for join_obj in query._joins:
589+
if join_obj._query._left_feature_group == fg:
590+
connector["filters"] = _serialize_filter_expression(
591+
join_obj._query._filter, join_obj._query, True
592+
)
569593
else:
570-
connector["type"] = "hudi"
594+
connector["time_travel_type"] = "hudi"
571595
return connector
572596

573597

python/hsfs/feature_group.py

-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ def __init__(
166166
)
167167
else:
168168
self._storage_connector: "sc.StorageConnector" = storage_connector
169-
170169
self._online_config = (
171170
OnlineConfig.from_response_json(online_config)
172171
if isinstance(online_config, dict)

python/hsfs/storage_connector.py

+9
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,15 @@ def prepare_spark(self, path: Optional[str] = None) -> Optional[str]:
369369
"""
370370
return engine.get_instance().setup_storage_connector(self, path)
371371

372+
def connector_options(self) -> Dict[str, Any]:
373+
"""Return options to be passed to an external S3 connector library"""
374+
return {
375+
"access_key": self.access_key,
376+
"secret_key": self.secret_key,
377+
"session_token": self.session_token,
378+
"region": self.region,
379+
}
380+
372381
def read(
373382
self,
374383
query: Optional[str] = None,

python/pyproject.toml

+2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ dev-no-opt = [
7474
"pyspark==3.1.1",
7575
"moto[s3]==5.0.0",
7676
"typeguard==4.2.1",
77+
"delta-spark==1.0.1"
7778
]
7879
dev-pandas1 = [
7980
"hopsworks[python]",
@@ -84,6 +85,7 @@ dev-pandas1 = [
8485
"moto[s3]==5.0.0",
8586
"pandas<=1.5.3",
8687
"sqlalchemy<=1.4.48",
88+
"delta-spark==1.0.1"
8789
]
8890
dev = ["hopsworks[dev-no-opt,great-expectations,polars]"]
8991
polars=["polars>=0.20.18,<=0.21.0"]

python/tests/core/test_arrow_flight_client.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def test_construct_query_object(self, mocker, backend_fixtures):
252252
"right_filter": None,
253253
},
254254
},
255-
"connectors": {"test.fg_test_1": {"type": "hudi"}},
255+
"connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}},
256256
}
257257

258258
query_object["features"] = {
@@ -293,7 +293,7 @@ def test_construct_query_object_datetime_filter(self, mocker, backend_fixtures):
293293
},
294294
"right_filter": None,
295295
},
296-
"connectors": {"test.fg_test_1": {"type": "hudi"}},
296+
"connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}},
297297
}
298298

299299
query_object["features"] = {
@@ -331,7 +331,7 @@ def test_construct_query_object_without_fs(self, mocker, backend_fixtures):
331331
},
332332
"right_filter": None,
333333
},
334-
"connectors": {"test.fg_test_1": {"type": "hudi"}},
334+
"connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}},
335335
}
336336

337337
query_object["features"] = {
@@ -369,7 +369,7 @@ def test_construct_query_object_without_fs_excluded(self, mocker, backend_fixtur
369369
},
370370
"right_filter": None,
371371
},
372-
"connectors": {"test.fg_test_1": {"type": "hudi"}},
372+
"connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}},
373373
}
374374

375375
query_object["features"] = {
@@ -430,7 +430,8 @@ def test_construct_query_object_snowflake(self, mocker, backend_fixtures):
430430
},
431431
"connectors": {
432432
"test.tpch1snowflake_1": {
433-
"type": "SNOWFLAKE",
433+
"time_travel_type": None,
434+
"type": 'SNOWFLAKE',
434435
"options": {
435436
"user": "test_user",
436437
"account": "test_url",

0 commit comments

Comments
 (0)