11
11
# the License.
12
12
13
13
import copy
14
+ from pprint import pprint
14
15
import threading
15
16
from urllib import parse
16
17
from typing import Tuple
17
18
18
19
from ..client .check import is_legal_host , is_legal_port , is_legal_address
19
20
from ..client .grpc_handler import GrpcHandler
20
- from ..client .utils import get_server_type , ZILLIZ
21
21
22
22
from ..settings import Config
23
23
from ..exceptions import ExceptionsMessage , ConnectionConfigException , ConnectionNotExistException
@@ -82,16 +82,25 @@ def __init__(self):
82
82
self ._connected_alias = {}
83
83
self ._connection_references = {}
84
84
self ._con_lock = threading .RLock ()
85
-
86
- address , user , _ , db_name = self .__parse_info (Config .MILVUS_URI )
87
-
88
- default_conn_config = {
89
- "user" : user ,
90
- "address" : address ,
91
- "db_name" : db_name ,
92
- }
93
-
94
- self .add_connection (** {Config .MILVUS_CONN_ALIAS : default_conn_config })
85
+ # info = self.__parse_info(
86
+ # uri=Config.MILVUS_URI,
87
+ # host=Config.DEFAULT_HOST,
88
+ # port=Config.DEFAULT_PORT,
89
+ # user = Config.MILVUS_USER,
90
+ # password = Config.MILVUS_PASSWORD,
91
+ # token = Config.MILVUS_TOKEN,
92
+ # secure=Config.DEFAULT_SECURE,
93
+ # db_name=Config.MILVUS_DB_NAME
94
+ # )
95
+
96
+ # default_conn_config = {
97
+ # "user": info["user"],
98
+ # "address": info["address"],
99
+ # "db_name": info["db_name"],
100
+ # "secure": info["secure"],
101
+ # }
102
+
103
+ # self.add_connection(**{Config.MILVUS_CONN_ALIAS: default_conn_config})
95
104
96
105
def add_connection (self , ** kwargs ):
97
106
""" Configures a milvus connection.
@@ -124,20 +133,21 @@ def add_connection(self, **kwargs):
124
133
)
125
134
"""
126
135
for alias , config in kwargs .items ():
127
- address , user , _ , db_name = self .__parse_info (** config )
136
+ parsed = self .__parse_info (** config )
128
137
129
138
if alias in self ._connected_alias :
130
139
if (
131
- self ._alias [alias ].get ("address" ) != address
132
- or self ._alias [alias ].get ("user" ) != user
133
- or self ._alias [alias ].get ("db_name" ) != db_name
140
+ self ._alias [alias ].get ("address" ) != parsed ["address" ]
141
+ or self ._alias [alias ].get ("user" ) != parsed ["user" ]
142
+ or self ._alias [alias ].get ("db_name" ) != parsed ["db_name" ]
143
+ or self ._alias [alias ].get ("secure" ) != parsed ["secure" ]
134
144
):
135
145
raise ConnectionConfigException (message = ExceptionsMessage .ConnDiffConf % alias )
136
-
137
146
alias_config = {
138
- "address" : address ,
139
- "user" : user ,
140
- "db_name" : db_name ,
147
+ "address" : parsed ["address" ],
148
+ "user" : parsed ["user" ],
149
+ "db_name" : parsed ["db_name" ],
150
+ "secure" : parsed ["secure" ],
141
151
}
142
152
143
153
self ._alias [alias ] = alias_config
@@ -237,18 +247,19 @@ def connect_milvus(**kwargs):
237
247
and connection_details ["address" ] == kwargs ["address" ]
238
248
and connection_details ["user" ] == kwargs ["user" ]
239
249
and connection_details ["db_name" ] == kwargs ["db_name" ]
250
+ and connection_details ["secure" ] == kwargs ["secure" ]
240
251
):
241
252
gh = self ._connected_alias [key ]
242
253
break
243
254
244
255
if gh is None :
245
256
gh = GrpcHandler (** kwargs )
246
- t = kwargs .get ("timeout" )
257
+ t = kwargs .get ("timeout" , None )
247
258
timeout = t if isinstance (t , (int , float )) else Config .MILVUS_CONN_TIMEOUT
248
259
gh ._wait_for_channel_ready (timeout = timeout )
249
260
250
261
kwargs .pop ('password' , None )
251
- kwargs .pop ('secure ' , None )
262
+ kwargs .pop ('token ' , None )
252
263
253
264
self ._connected_alias [alias ] = gh
254
265
@@ -262,36 +273,58 @@ def connect_milvus(**kwargs):
262
273
if not isinstance (alias , str ):
263
274
raise ConnectionConfigException (message = ExceptionsMessage .AliasType % type (alias ))
264
275
276
+ # Grab the relevant info for connection
265
277
address = kwargs .pop ("address" , "" )
266
278
uri = kwargs .pop ("uri" , "" )
267
279
host = kwargs .pop ("host" , "" )
268
280
port = kwargs .pop ("port" , "" )
281
+ secure = kwargs .pop ("secure" , None )
282
+
283
+ # Clean the connection info
284
+ address = '' if address is None else str (address )
285
+ uri = '' if uri is None else str (uri )
286
+ host = '' if host is None else str (host )
287
+ port = '' if port is None else str (port )
269
288
user = '' if user is None else str (user )
270
289
password = '' if password is None else str (password )
290
+ token = '' if token is None else (str (token ))
271
291
db_name = '' if db_name is None else str (db_name )
272
292
273
- if set ([address , uri , host , port ]) != {'' }:
274
- address , user , password , db_name = self .__parse_info (address , uri , host , port , db_name , user , password )
275
- kwargs ["address" ] = address
276
-
277
- elif alias in self ._alias :
293
+ # Replace empties with defaults from enviroment
294
+ uri = uri if uri != '' else Config .MILVUS_URI
295
+ host = host if host != '' else Config .DEFAULT_HOST
296
+ port = port if port != '' else Config .DEFAULT_PORT
297
+ user = user if user != '' else Config .MILVUS_USER
298
+ password = password if password != '' else Config .MILVUS_PASSWORD
299
+ token = token if token != '' else Config .MILVUS_TOKEN
300
+ db_name = db_name if db_name != '' else Config .MILVUS_DB_NAME
301
+
302
+ # If no address info is given, check if an alias exists
303
+ if alias in self ._alias :
278
304
kwargs = dict (self ._alias [alias ].items ())
279
305
# If user is passed in, use it, if not, use previous connections user.
280
306
prev_user = kwargs .pop ("user" )
281
- user = user if user != "" else prev_user
307
+ kwargs ["user" ] = user if user != "" else prev_user
308
+
309
+ # If new secure parameter passed in, use that
310
+ prev_secure = kwargs .pop ("secure" )
311
+ kwargs ["secure" ] = secure if secure is not None else prev_secure
312
+
282
313
# If db_name is passed in, use it, if not, use previous db_name.
283
314
prev_db_name = kwargs .pop ("db_name" )
284
- db_name = db_name if db_name != "" else prev_db_name
315
+ kwargs [ " db_name" ] = db_name if db_name != "" else prev_db_name
285
316
286
- # No params, env, and cached configs for the alias
317
+ # If at least one address info is given, parse it
318
+ elif set ([address , uri , host , port ]) != {'' }:
319
+ secure = secure if secure is not None else Config .DEFAULT_SECURE
320
+ parsed = self .__parse_info (address , uri , host , port , db_name , user , password , token , secure )
321
+ kwargs .update (parsed )
322
+
323
+ # If no details are given and no alias exists
287
324
else :
288
325
raise ConnectionConfigException (message = ExceptionsMessage .ConnLackConf % alias )
289
326
290
- # Set secure=True if username and password are provided
291
- if len (user ) > 0 and len (password ) > 0 :
292
- kwargs ["secure" ] = True
293
-
294
- connect_milvus (** kwargs , user = user , password = password , db_name = db_name )
327
+ connect_milvus (** kwargs )
295
328
296
329
297
330
def list_connections (self ) -> list :
@@ -364,43 +397,40 @@ def __parse_info(
364
397
db_name : str = "" ,
365
398
user : str = "" ,
366
399
password : str = "" ,
400
+ token : str = "" ,
401
+ secure : bool = False ,
367
402
** kwargs ) -> dict :
368
403
369
- passed_in_address = ""
370
- passed_in_user = ""
371
- passed_in_password = ""
372
- passed_in_db_name = ""
373
-
374
- # If uri
404
+ extracted_address = ""
405
+ extracted_user = ""
406
+ extracted_password = ""
407
+ extracted_db_name = ""
408
+ extracted_token = ""
409
+ extracted_secure = None
410
+ # If URI
375
411
if uri != "" :
376
- passed_in_address , passed_in_user , passed_in_password , passed_in_db_name = (
412
+ extracted_address , extracted_user , extracted_password , extracted_db_name , extracted_secure = (
377
413
self .__parse_address_from_uri (uri )
378
414
)
379
-
415
+ # If Address
380
416
elif address != "" :
381
417
if not is_legal_address (address ):
382
418
raise ConnectionConfigException (
383
419
message = f"Illegal address: { address } , should be in form 'localhost:19530'" )
384
- passed_in_address = address
385
-
420
+ extracted_address = address
421
+ # If Host port
386
422
else :
387
- if host == "" :
388
- host = Config .DEFAULT_HOST
389
- if port == "" :
390
- port = Config .DEFAULT_PORT
391
423
self .__verify_host_port (host , port )
392
- passed_in_address = f"{ host } :{ port } "
393
-
394
- passed_in_user = user if passed_in_user == "" else str (passed_in_user )
395
- passed_in_user = Config .MILVUS_USER if passed_in_user == "" else str (passed_in_user )
396
-
397
- passed_in_password = password if passed_in_password == "" else str (passed_in_password )
398
- passed_in_password = Config .MILVUS_PASSWORD if passed_in_password == "" else str (passed_in_password )
399
-
400
- passed_in_db_name = db_name if passed_in_db_name == "" else str (passed_in_db_name )
401
- passed_in_db_name = Config .MILVUS_DB_NAME if passed_in_db_name == "" else str (passed_in_db_name )
424
+ extracted_address = f"{ host } :{ port } "
425
+ ret = {}
426
+ ret ["address" ] = extracted_address
427
+ ret ["user" ] = user if extracted_user == "" else str (extracted_user )
428
+ ret ["password" ] = password if extracted_password == "" else str (extracted_password )
429
+ ret ["db_name" ] = db_name if extracted_db_name == "" else str (extracted_db_name )
430
+ ret ["token" ] = token if extracted_token == "" else str (extracted_token )
431
+ ret ["secure" ] = secure if extracted_secure is None else extracted_secure
402
432
403
- return passed_in_address , passed_in_user , passed_in_password , passed_in_db_name
433
+ return ret
404
434
405
435
def __verify_host_port (self , host , port ):
406
436
if not is_legal_host (host ):
@@ -431,6 +461,7 @@ def __parse_address_from_uri(self, uri: str) -> Tuple[str, str, str, str]:
431
461
port = parsed_uri .port if parsed_uri .port is not None else ""
432
462
user = parsed_uri .username if parsed_uri .username is not None else ""
433
463
password = parsed_uri .password if parsed_uri .password is not None else ""
464
+ secure = parsed_uri .scheme .lower () == "https:"
434
465
435
466
if host == "" :
436
467
raise ConnectionConfigException (message = f"Illegal uri: URI is missing host address: { uri } " )
@@ -443,7 +474,7 @@ def __parse_address_from_uri(self, uri: str) -> Tuple[str, str, str, str]:
443
474
if not is_legal_address (addr ):
444
475
raise ConnectionConfigException (message = illegal_uri_msg .format (uri ))
445
476
446
- return addr , user , password , db_name
477
+ return addr , user , password , db_name , secure
447
478
448
479
449
480
def _fetch_handler (self , alias = Config .MILVUS_CONN_ALIAS ) -> GrpcHandler :
0 commit comments