Skip to content

Commit 17b21e3

Browse files
author
Filip Haltmayer
committed
reuse grpc channel
Signed-off-by: Filip Haltmayer <filip.haltmayer@zilliz.com>
1 parent 16e35e8 commit 17b21e3

File tree

1 file changed

+56
-38
lines changed

1 file changed

+56
-38
lines changed

Diff for: pymilvus/orm/connections.py

+56-38
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ def __init__(self):
7878
"""
7979
self._alias = {}
8080
self._connected_alias = {}
81+
self._connection_references = {}
82+
self._con_lock = threading.RLock()
8183
self._env_uri = None
8284

8385
if Config.MILVUS_URI != "":
@@ -199,8 +201,13 @@ def disconnect(self, alias: str):
199201
if not isinstance(alias, str):
200202
raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias))
201203

202-
if alias in self._connected_alias:
203-
self._connected_alias.pop(alias).close()
204+
with self._con_lock:
205+
if alias in self._connected_alias:
206+
gh = self._connected_alias.pop(alias)
207+
self._connection_references[id(gh)] -= 1
208+
if self._connection_references[id(gh)] <= 0:
209+
gh.close()
210+
del self._connection_references[id(gh)]
204211

205212
def remove_connection(self, alias: str):
206213
""" Removes connection from the registry.
@@ -263,17 +270,34 @@ def connect(self, alias=Config.MILVUS_CONN_ALIAS, user="", password="", **kwargs
263270
>>> connections.connect("test", host="localhost", port="19530")
264271
"""
265272
def connect_milvus(**kwargs):
266-
gh = GrpcHandler(**kwargs)
267-
268-
t = kwargs.get("timeout")
269-
timeout = t if isinstance(t, (int, float)) else Config.MILVUS_CONN_TIMEOUT
270-
271-
gh._wait_for_channel_ready(timeout=timeout)
272-
kwargs.pop('password')
273-
kwargs.pop('secure', None)
274-
275-
self._connected_alias[alias] = gh
276-
self._alias[alias] = copy.deepcopy(kwargs)
273+
with self._con_lock:
274+
gh = None
275+
for key, connection_details in self._alias.items():
276+
277+
if (
278+
connection_details["address"] == kwargs["address"]
279+
and connection_details["user"] == kwargs["user"]
280+
and key in self._connected_alias
281+
):
282+
gh = self._connected_alias[key]
283+
break
284+
285+
if gh is None:
286+
gh = GrpcHandler(**kwargs)
287+
t = kwargs.get("timeout")
288+
timeout = t if isinstance(t, (int, float)) else Config.MILVUS_CONN_TIMEOUT
289+
gh._wait_for_channel_ready(timeout=timeout)
290+
291+
kwargs.pop('password', None)
292+
kwargs.pop('secure', None)
293+
294+
self._connected_alias[alias] = gh
295+
self._alias[alias] = copy.deepcopy(kwargs)
296+
297+
if id(gh) not in self._connection_references:
298+
self._connection_references[id(gh)] = 1
299+
else:
300+
self._connection_references[id(gh)] += 1
277301

278302
def with_config(config: Tuple) -> bool:
279303
for c in config:
@@ -293,11 +317,8 @@ def with_config(config: Tuple) -> bool:
293317
)
294318

295319
# Make sure passed in None doesnt break
296-
user = user or ""
297-
password = password or ""
298-
# Make sure passed in are Strings
299-
user = str(user)
300-
password = str(password)
320+
user = '' if user is None else str(user)
321+
password = '' if password is None else str(password)
301322

302323
# 1st Priority: connection from params
303324
if with_config(config):
@@ -313,36 +334,31 @@ def with_config(config: Tuple) -> bool:
313334
user = parsed_uri.username if parsed_uri.username is not None else user
314335
password = parsed_uri.password if parsed_uri.password is not None else password
315336

316-
# Set secure=True if username and password are provided
317-
if len(user) > 0 and len(password) > 0:
318-
kwargs["secure"] = True
319-
320-
connect_milvus(**kwargs, user=user, password=password)
321-
return
322337

323338
# 2nd Priority, connection configs from env
324-
if self._env_uri is not None:
339+
elif self._env_uri is not None:
325340
addr, parsed_uri = self._env_uri
326341
kwargs["address"] = addr
327342

328343
user = parsed_uri.username if parsed_uri.username is not None else ""
329344
password = parsed_uri.password if parsed_uri.password is not None else ""
330-
# Set secure=True if uri provided user and password
331-
if len(user) > 0 and len(password) > 0:
332-
kwargs["secure"] = True
333345

334-
connect_milvus(**kwargs, user=user, password=password)
335-
return
336346

337347
# 3rd Priority, connect to cached configs with provided user and password
338-
if alias in self._alias:
339-
connect_alias = dict(self._alias[alias].items())
340-
connect_alias["user"] = user
341-
connect_milvus(**connect_alias, password=password, **kwargs)
342-
return
348+
elif alias in self._alias:
349+
kwargs = dict(self._alias[alias].items())
350+
# If user is passed in, use it, if not, use previous connections user.
351+
user = user if user != "" else kwargs.pop("user")
343352

344353
# No params, env, and cached configs for the alias
345-
raise ConnectionConfigException(message=ExceptionsMessage.ConnLackConf % alias)
354+
else:
355+
raise ConnectionConfigException(message=ExceptionsMessage.ConnLackConf % alias)
356+
357+
# Set secure=True if username and password are provided
358+
if len(user) > 0 and len(password) > 0:
359+
kwargs["secure"] = True
360+
361+
connect_milvus(**kwargs, user=user, password=password)
346362

347363

348364
def list_connections(self) -> list:
@@ -357,7 +373,8 @@ def list_connections(self) -> list:
357373
>>> connections.list_connections()
358374
// TODO [('default', None), ('test', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f05003f3e80>)]
359375
"""
360-
return [(k, self._connected_alias.get(k, None)) for k in self._alias]
376+
with self._con_lock:
377+
return [(k, self._connected_alias.get(k, None)) for k in self._alias]
361378

362379
def get_connection_addr(self, alias: str):
363380
"""
@@ -402,7 +419,8 @@ def has_connection(self, alias: str) -> bool:
402419
"""
403420
if not isinstance(alias, str):
404421
raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias))
405-
return alias in self._connected_alias
422+
with self._con_lock:
423+
return alias in self._connected_alias
406424

407425
def _fetch_handler(self, alias=Config.MILVUS_CONN_ALIAS) -> GrpcHandler:
408426
""" Retrieves a GrpcHandler by alias. """

0 commit comments

Comments
 (0)