Skip to content

Commit fbfc07a

Browse files
authored
[FSTORE-1047] Support Similarity Search in the Feature Store (#1139)
* fall back to use raw feature name cache required sk embedding fg right join fv get vector fv find neighbour write to single index read embedding metadata read embedding create embedding * check embedding in query read * get_feature_vectors * fix filter entry by join index * exclude fg from serving * fix hw login * license * opensearch singleton * rename to embedding_index * reformat * reformat * assign feature store to feature group * fix serving key in helper column * reformat * fix test * address comments * minor fix * minor fix * read offline embedding * read offline embedding * fix pyspark test * reformat * remove hopsworks dependency * reformat * fix elastic url * fix elastic url * fix style * fix style
1 parent 60bfb1a commit fbfc07a

18 files changed

+840
-33
lines changed

python/hsfs/connection.py

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from requests.exceptions import ConnectionError
2121

2222
from hsfs.decorators import connected, not_connected
23+
from hsfs.core.opensearch import OpenSearchClientSingleton
2324
from hsfs import engine, client, util, usage
2425
from hsfs.core import (
2526
feature_store_api,
@@ -276,6 +277,7 @@ def close(self):
276277
conn.close()
277278
```
278279
"""
280+
OpenSearchClientSingleton().close()
279281
client.stop()
280282
self._feature_store_api = None
281283
engine.stop()

python/hsfs/constructor/query.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def read(
138138
"""
139139
if not read_options:
140140
read_options = {}
141-
141+
self._check_read_supported(online)
142142
sql_query, online_conn = self._prep_read(online, read_options)
143143

144144
schema = None
@@ -179,6 +179,7 @@ def show(self, n: int, online: Optional[bool] = False):
179179
n: Number of rows to show.
180180
online: Show from online storage. Defaults to `False`.
181181
"""
182+
self._check_read_supported(online)
182183
read_options = {}
183184
sql_query, online_conn = self._prep_read(online, read_options)
184185

@@ -474,6 +475,16 @@ def from_response_json(cls, json_dict):
474475
filter=json_decamelized.get("filter", None),
475476
)
476477

478+
def _check_read_supported(self, online):
479+
if not online:
480+
return
481+
for fg in self.featuregroups:
482+
if fg.embedding_index:
483+
raise FeatureStoreException(
484+
"Reading from query containing embedding is not supported."
485+
" Use `feature_view.get_feature_vector(s) instead."
486+
)
487+
477488
@classmethod
478489
def _hopsworks_json(cls, json_dict):
479490
"""

python/hsfs/core/opensearch.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Copyright 2023 Logical Clocks AB
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from hsfs.client.exceptions import FeatureStoreException
18+
import logging
19+
from hsfs.core.opensearch_api import OpenSearchApi
20+
from hsfs import client
21+
22+
23+
class OpenSearchClientSingleton:
24+
_instance = None
25+
26+
def __new__(cls):
27+
if not cls._instance:
28+
cls._instance = super(OpenSearchClientSingleton, cls).__new__(cls)
29+
cls._instance._opensearch_client = None
30+
cls._instance._setup_opensearch_client()
31+
return cls._instance
32+
33+
def _setup_opensearch_client(self):
34+
if not self._opensearch_client:
35+
try:
36+
from opensearchpy import OpenSearch
37+
from opensearchpy.exceptions import (
38+
ConnectionError as OpenSearchConnectionError,
39+
)
40+
41+
self.OpenSearchConnectionError = OpenSearchConnectionError
42+
except ModuleNotFoundError:
43+
raise FeatureStoreException(
44+
"hopsworks and opensearchpy are required for embedding similarity search"
45+
)
46+
# query log is at INFO level
47+
# 2023-11-24 15:10:49,470 INFO: POST https://localhost:9200/index/_search [status:200 request:0.041s]
48+
logging.getLogger("opensearchpy").setLevel(logging.WARNING)
49+
self._opensearch_client = OpenSearch(
50+
**OpenSearchApi(
51+
client.get_instance()._project_id,
52+
client.get_instance()._project_name,
53+
).get_default_py_config()
54+
)
55+
56+
def _refresh_opensearch_connection(self):
57+
self._opensearch_client.close()
58+
self._opensearch_client = None
59+
self._setup_opensearch_client()
60+
61+
def search(self, index=None, body=None):
62+
try:
63+
return self._opensearch_client.search(body=body, index=index)
64+
except self.OpenSearchConnectionError:
65+
self._refresh_opensearch_connection()
66+
return self._opensearch_client.search(body=body, index=index)
67+
68+
def close(self):
69+
if self._opensearch_client:
70+
self._opensearch_client.close()

python/hsfs/core/opensearch_api.py

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#
2+
# Copyright 2023 Logical Clocks AB
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from furl import furl
18+
19+
from hsfs import client
20+
from hsfs.client.exceptions import FeatureStoreException
21+
from hsfs.core.variable_api import VariableApi
22+
23+
24+
class OPENSEARCH_CONFIG:
25+
ELASTIC_ENDPOINT_ENV_VAR = "ELASTIC_ENDPOINT"
26+
SSL_CONFIG = "es.net.ssl"
27+
NODES_WAN_ONLY = "es.nodes.wan.only"
28+
NODES = "es.nodes"
29+
SSL_KEYSTORE_LOCATION = "es.net.ssl.keystore.location"
30+
SSL_KEYSTORE_PASSWORD = "es.net.ssl.keystore.pass"
31+
SSL_TRUSTSTORE_LOCATION = "es.net.ssl.truststore.location"
32+
SSL_TRUSTSTORE_PASSWORD = "es.net.ssl.truststore.pass"
33+
HTTP_AUTHORIZATION = "es.net.http.header.Authorization"
34+
INDEX = "es.resource"
35+
HOSTS = "hosts"
36+
HTTP_COMPRESS = "http_compress"
37+
HEADERS = "headers"
38+
USE_SSL = "use_ssl"
39+
VERIFY_CERTS = "verify_certs"
40+
SSL_ASSERT_HOSTNAME = "ssl_assert_hostname"
41+
CA_CERTS = "ca_certs"
42+
43+
44+
class OpenSearchApi:
45+
def __init__(
46+
self,
47+
project_id,
48+
project_name,
49+
):
50+
self._project_id = project_id
51+
self._project_name = project_name
52+
self._variable_api = VariableApi()
53+
54+
def _get_opensearch_url(self):
55+
if isinstance(client.get_instance(), client.external.Client):
56+
external_domain = self._variable_api.get_loadbalancer_external_domain()
57+
if external_domain == "":
58+
raise FeatureStoreException(
59+
"External client could not locate loadbalancer_external_domain "
60+
"in cluster configuration or variable is empty."
61+
)
62+
return f"https://{external_domain}:9200"
63+
else:
64+
service_discovery_domain = self._variable_api.get_service_discovery_domain()
65+
if service_discovery_domain == "":
66+
raise FeatureStoreException(
67+
"Client could not locate service_discovery_domain "
68+
"in cluster configuration or variable is empty."
69+
)
70+
return f"https://rest.elastic.service.{service_discovery_domain}:9200"
71+
72+
def get_project_index(self, index):
73+
"""
74+
This helper method prefixes the supplied index name with the project name to avoid index name clashes.
75+
76+
Args:
77+
:index: the opensearch index to interact with.
78+
79+
Returns:
80+
A valid opensearch index name.
81+
"""
82+
return (self._project_name + "_" + index).lower()
83+
84+
def get_default_py_config(self):
85+
"""
86+
Get the required opensearch configuration to setup a connection using the *opensearch-py* library.
87+
88+
```python
89+
90+
import hopsworks
91+
from opensearchpy import OpenSearch
92+
93+
project = hopsworks.login()
94+
95+
opensearch_api = project.get_opensearch_api()
96+
97+
client = OpenSearch(**opensearch_api.get_default_py_config())
98+
99+
```
100+
Returns:
101+
A dictionary with required configuration.
102+
"""
103+
url = furl(self._get_opensearch_url())
104+
return {
105+
OPENSEARCH_CONFIG.HOSTS: [{"host": url.host, "port": url.port}],
106+
OPENSEARCH_CONFIG.HTTP_COMPRESS: False,
107+
OPENSEARCH_CONFIG.HEADERS: {
108+
"Authorization": self._get_authorization_token()
109+
},
110+
OPENSEARCH_CONFIG.USE_SSL: True,
111+
OPENSEARCH_CONFIG.VERIFY_CERTS: True,
112+
OPENSEARCH_CONFIG.SSL_ASSERT_HOSTNAME: False,
113+
OPENSEARCH_CONFIG.CA_CERTS: client.get_instance()._get_ca_chain_path(),
114+
}
115+
116+
def _get_authorization_token(self):
117+
"""Get opensearch jwt token.
118+
119+
# Returns
120+
`str`: OpenSearch jwt token
121+
# Raises
122+
`RestAPIError`: If unable to get the token
123+
"""
124+
125+
_client = client.get_instance()
126+
path_params = ["elastic", "jwt", self._project_id]
127+
128+
headers = {"content-type": "application/json"}
129+
return _client._send_request("GET", path_params, headers=headers)["token"]

0 commit comments

Comments
 (0)