@@ -78,6 +78,8 @@ def __init__(self):
78
78
"""
79
79
self ._alias = {}
80
80
self ._connected_alias = {}
81
+ self ._connection_references = {}
82
+ self ._con_lock = threading .RLock ()
81
83
self ._env_uri = None
82
84
83
85
if Config .MILVUS_URI != "" :
@@ -199,8 +201,13 @@ def disconnect(self, alias: str):
199
201
if not isinstance (alias , str ):
200
202
raise ConnectionConfigException (message = ExceptionsMessage .AliasType % type (alias ))
201
203
202
- if alias in self ._connected_alias :
203
- self ._connected_alias .pop (alias ).close ()
204
+ with self ._con_lock :
205
+ if alias in self ._connected_alias :
206
+ gh = self ._connected_alias .pop (alias )
207
+ self ._connection_references [id (gh )] -= 1
208
+ if self ._connection_references [id (gh )] <= 0 :
209
+ gh .close ()
210
+ del self ._connection_references [id (gh )]
204
211
205
212
def remove_connection (self , alias : str ):
206
213
""" Removes connection from the registry.
@@ -263,17 +270,34 @@ def connect(self, alias=Config.MILVUS_CONN_ALIAS, user="", password="", **kwargs
263
270
>>> connections.connect("test", host="localhost", port="19530")
264
271
"""
265
272
def connect_milvus (** kwargs ):
266
- gh = GrpcHandler (** kwargs )
267
-
268
- t = kwargs .get ("timeout" )
269
- timeout = t if isinstance (t , (int , float )) else Config .MILVUS_CONN_TIMEOUT
270
-
271
- gh ._wait_for_channel_ready (timeout = timeout )
272
- kwargs .pop ('password' )
273
- kwargs .pop ('secure' , None )
274
-
275
- self ._connected_alias [alias ] = gh
276
- self ._alias [alias ] = copy .deepcopy (kwargs )
273
+ with self ._con_lock :
274
+ gh = None
275
+ for key , connection_details in self ._alias .items ():
276
+
277
+ if (
278
+ connection_details ["address" ] == kwargs ["address" ]
279
+ and connection_details ["user" ] == kwargs ["user" ]
280
+ and key in self ._connected_alias
281
+ ):
282
+ gh = self ._connected_alias [key ]
283
+ break
284
+
285
+ if gh is None :
286
+ gh = GrpcHandler (** kwargs )
287
+ t = kwargs .get ("timeout" )
288
+ timeout = t if isinstance (t , (int , float )) else Config .MILVUS_CONN_TIMEOUT
289
+ gh ._wait_for_channel_ready (timeout = timeout )
290
+
291
+ kwargs .pop ('password' , None )
292
+ kwargs .pop ('secure' , None )
293
+
294
+ self ._connected_alias [alias ] = gh
295
+ self ._alias [alias ] = copy .deepcopy (kwargs )
296
+
297
+ if id (gh ) not in self ._connection_references :
298
+ self ._connection_references [id (gh )] = 1
299
+ else :
300
+ self ._connection_references [id (gh )] += 1
277
301
278
302
def with_config (config : Tuple ) -> bool :
279
303
for c in config :
@@ -293,11 +317,8 @@ def with_config(config: Tuple) -> bool:
293
317
)
294
318
295
319
# Make sure passed in None doesnt break
296
- user = user or ""
297
- password = password or ""
298
- # Make sure passed in are Strings
299
- user = str (user )
300
- password = str (password )
320
+ user = '' if user is None else str (user )
321
+ password = '' if password is None else str (password )
301
322
302
323
# 1st Priority: connection from params
303
324
if with_config (config ):
@@ -313,36 +334,32 @@ def with_config(config: Tuple) -> bool:
313
334
user = parsed_uri .username if parsed_uri .username is not None else user
314
335
password = parsed_uri .password if parsed_uri .password is not None else password
315
336
316
- # Set secure=True if username and password are provided
317
- if len (user ) > 0 and len (password ) > 0 :
318
- kwargs ["secure" ] = True
319
-
320
- connect_milvus (** kwargs , user = user , password = password )
321
- return
322
337
323
338
# 2nd Priority, connection configs from env
324
- if self ._env_uri is not None :
339
+ elif self ._env_uri is not None :
325
340
addr , parsed_uri = self ._env_uri
326
341
kwargs ["address" ] = addr
327
342
328
343
user = parsed_uri .username if parsed_uri .username is not None else ""
329
344
password = parsed_uri .password if parsed_uri .password is not None else ""
330
- # Set secure=True if uri provided user and password
331
- if len (user ) > 0 and len (password ) > 0 :
332
- kwargs ["secure" ] = True
333
345
334
- connect_milvus (** kwargs , user = user , password = password )
335
- return
336
346
337
347
# 3rd Priority, connect to cached configs with provided user and password
338
- if alias in self ._alias :
339
- connect_alias = dict (self ._alias [alias ].items ())
340
- connect_alias [ " user" ] = user
341
- connect_milvus ( ** connect_alias , password = password , ** kwargs )
342
- return
348
+ elif alias in self ._alias :
349
+ kwargs = dict (self ._alias [alias ].items ())
350
+ # If user is passed in, use it, if not, use previous connections user.
351
+ prev_user = kwargs . pop ( "user" )
352
+ user = user if user != "" else prev_user
343
353
344
354
# No params, env, and cached configs for the alias
345
- raise ConnectionConfigException (message = ExceptionsMessage .ConnLackConf % alias )
355
+ else :
356
+ raise ConnectionConfigException (message = ExceptionsMessage .ConnLackConf % alias )
357
+
358
+ # Set secure=True if username and password are provided
359
+ if len (user ) > 0 and len (password ) > 0 :
360
+ kwargs ["secure" ] = True
361
+
362
+ connect_milvus (** kwargs , user = user , password = password )
346
363
347
364
348
365
def list_connections (self ) -> list :
@@ -357,7 +374,8 @@ def list_connections(self) -> list:
357
374
>>> connections.list_connections()
358
375
// TODO [('default', None), ('test', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f05003f3e80>)]
359
376
"""
360
- return [(k , self ._connected_alias .get (k , None )) for k in self ._alias ]
377
+ with self ._con_lock :
378
+ return [(k , self ._connected_alias .get (k , None )) for k in self ._alias ]
361
379
362
380
def get_connection_addr (self , alias : str ):
363
381
"""
@@ -402,7 +420,8 @@ def has_connection(self, alias: str) -> bool:
402
420
"""
403
421
if not isinstance (alias , str ):
404
422
raise ConnectionConfigException (message = ExceptionsMessage .AliasType % type (alias ))
405
- return alias in self ._connected_alias
423
+ with self ._con_lock :
424
+ return alias in self ._connected_alias
406
425
407
426
def _fetch_handler (self , alias = Config .MILVUS_CONN_ALIAS ) -> GrpcHandler :
408
427
""" Retrieves a GrpcHandler by alias. """
0 commit comments