forked from logicalclocks/hopsworks-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfs_query.py
133 lines (116 loc) · 4.38 KB
/
fs_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#
# Copyright 2020 Logical Clocks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
from typing import Any, Dict, List, Optional, TypeVar, Union
import humps
from hsfs import engine
from hsfs.constructor import external_feature_group_alias, hudi_feature_group_alias
class FsQuery:
def __init__(
self,
query: str,
on_demand_feature_groups: Optional[List[Dict[str, Any]]],
hudi_cached_feature_groups: Optional[List[Dict[str, Any]]],
query_online: Optional[str] = None,
pit_query: Optional[str] = None,
pit_query_asof: Optional[str] = None,
href: Optional[str] = None,
expand: Optional[List[str]] = None,
items: Optional[List[Dict[str, Any]]] = None,
type: Optional[str] = None,
**kwargs,
) -> None:
self._query = query
self._query_online = query_online
self._pit_query = pit_query
self._pit_query_asof = pit_query_asof
if on_demand_feature_groups is not None:
self._on_demand_fg_aliases = [
external_feature_group_alias.ExternalFeatureGroupAlias.from_response_json(
fg
)
for fg in on_demand_feature_groups
]
else:
self._on_demand_fg_aliases = []
if hudi_cached_feature_groups is not None:
self._hudi_cached_feature_groups = [
hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg)
for fg in hudi_cached_feature_groups
]
else:
self._hudi_cached_feature_groups = []
@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery":
json_decamelized = humps.decamelize(json_dict)
return cls(**json_decamelized)
@property
def query(self) -> str:
return self._query
@property
def query_online(self) -> Optional[str]:
return self._query_online
@property
def pit_query(self) -> Optional[str]:
return self._pit_query
@property
def pit_query_asof(self) -> Optional[str]:
return self._pit_query_asof
@property
def on_demand_fg_aliases(
self,
) -> List["external_feature_group_alias.ExternalFeatureGroupAlias"]:
return self._on_demand_fg_aliases
@property
def hudi_cached_feature_groups(
self,
) -> List["hudi_feature_group_alias.HudiFeatureGroupAlias"]:
return self._hudi_cached_feature_groups
def register_external(
self,
spine: Optional[
Union[TypeVar("pyspark.sql.DataFrame"), TypeVar("pyspark.RDD")]
] = None,
) -> None:
if self._on_demand_fg_aliases is None:
return
for external_fg_alias in self._on_demand_fg_aliases:
if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup":
external_fg_alias.on_demand_feature_group.dataframe = spine
engine.get_instance().register_external_temporary_table(
external_fg_alias.on_demand_feature_group,
external_fg_alias.alias,
)
def register_hudi_tables(
self,
feature_store_id: int,
feature_store_name: str,
read_options: Optional[Dict[str, Any]],
) -> None:
for hudi_fg in self._hudi_cached_feature_groups:
engine.get_instance().register_hudi_temporary_table(
hudi_fg, feature_store_id, feature_store_name, read_options
)
def register_delta_tables(
self,
feature_store_id: int,
feature_store_name: str,
read_options: Optional[Dict[str, Any]],
) -> None:
for hudi_fg in self._hudi_cached_feature_groups:
engine.get_instance().register_delta_temporary_table(
hudi_fg, feature_store_id, feature_store_name, read_options
)